From b9bf2f6e73b3ab8c4c6c5c8d9e4a2700cb7171c6 Mon Sep 17 00:00:00 2001 From: pseudoscalar <47653615+pseudoscalar@users.noreply.github.com> Date: Thu, 14 Oct 2021 13:03:57 -0500 Subject: [PATCH 1/7] Download video to sync to local cache --- local/local.go | 205 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 203 insertions(+), 2 deletions(-) diff --git a/local/local.go b/local/local.go index cacc57a..4bb23e3 100644 --- a/local/local.go +++ b/local/local.go @@ -1,22 +1,223 @@ package local import ( + "encoding/json" + "errors" "fmt" + "io/ioutil" + "time" + "os" + "os/exec" + "path" + "strings" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" + + "github.com/lbryio/ytsync/v5/downloader/ytdl" + + "github.com/lbryio/lbry.go/v2/extras/jsonrpc" ) +type SyncContext struct { + TempDir string + LbrynetAddr string +} + +func (c *SyncContext) Validate() error { + if c.TempDir == "" { + return errors.New("No TempDir provided") + } + if c.LbrynetAddr == "" { + return errors.New("No Lbrynet address provided") + } + return nil +} + +var syncContext SyncContext + func AddCommand(rootCmd *cobra.Command) { cmd := &cobra.Command{ Use: "local", Short: "run a personal ytsync", Run: localCmd, + Args: cobra.ExactArgs(1), } - //cmd.Flags().StringVar(&cache, "cache", "", "path to cache") + cmd.Flags().StringVar(&syncContext.TempDir, "temp-dir", getEnvDefault("TEMP_DIR", ""), "directory to use for temporary files") + cmd.Flags().StringVar(&syncContext.LbrynetAddr, "lbrynet-address", getEnvDefault("LBRYNET_ADDRESS", ""), "JSONRPC address of the local LBRYNet daemon") rootCmd.AddCommand(cmd) +} +func getEnvDefault(key, defaultValue string) string { + if value, ok := os.LookupEnv(key); ok { + return value + } + return defaultValue } func localCmd(cmd *cobra.Command, args []string) { - fmt.Println("local") + err := syncContext.Validate() + if err != nil { + log.Error(err) + return + } + fmt.Println(syncContext.LbrynetAddr) + + videoID := args[0] + fmt.Println(videoID) + + lbrynet := jsonrpc.NewClient(syncContext.LbrynetAddr) + lbrynet.SetRPCTimeout(5 * time.Minute) + + status, err := lbrynet.Status() + if err != nil { + log.Error(err) + return + } + + fmt.Println(status.IsRunning) + fmt.Println(status.Wallet.Connected) + + videoBasePath := path.Join(syncContext.TempDir, videoID) + + _, videoMetadataPath, err := getVideoMetadata(videoBasePath, videoID) + if err != nil { + log.Errorf("Error getting video metadata: %v", err) + return + } + + err = downloadVideo(videoBasePath, videoMetadataPath) + if err != nil { + log.Errorf("Error downloading video: %v", err) + } + + log.Info("Done") +} + +func getVideoMetadata(basePath, videoID string) (*ytdl.YtdlVideo, string, error) { + metadataPath := basePath + ".info.json" + + _, err := os.Stat(metadataPath) + if err != nil && !os.IsNotExist(err) { + log.Errorf("Error determining if video metadata already exists: %v", err) + return nil, "", err + } else if err == nil { + log.Debugf("Video metadata file %s already exists. Attempting to load existing file.", metadataPath) + videoMetadata, err := loadVideoMetadata(metadataPath) + if err != nil { + log.Debugf("Error loading pre-existing video metadata: %v. Deleting file and attempting re-download.", err) + } else { + return videoMetadata, metadataPath, nil + } + } + + if err := downloadVideoMetadata(basePath, videoID); err != nil { + return nil, "", err + } + + videoMetadata, err := loadVideoMetadata(metadataPath) + return videoMetadata, metadataPath, err +} + +func loadVideoMetadata(path string) (*ytdl.YtdlVideo, error) { + f, err := os.Open(path) + if err != nil { + return nil, err + } + defer f.Close() + + metadataBytes, err := ioutil.ReadAll(f) + if err != nil { + return nil, err + } + + var videoMetadata *ytdl.YtdlVideo + err = json.Unmarshal(metadataBytes, &videoMetadata) + if err != nil { + return nil, err + } + + return videoMetadata, nil +} + +func downloadVideoMetadata(basePath, videoID string) error { + ytdlArgs := []string{ + "--skip-download", + "--write-info-json", + "--force-overwrites", + fmt.Sprintf("https://www.youtube.com/watch?v=%s", videoID), + "--cookies", + "cookies.txt", + "-o", + basePath, + } + ytdlCmd := exec.Command("yt-dlp", ytdlArgs...) + output, err := runCmd(ytdlCmd) + log.Debug(output) + return err +} + +func downloadVideo(basePath, metadataPath string) error { + ytdlArgs := []string{ + "--no-progress", + "-o", + basePath, + "--merge-output-format", + "mp4", + "--postprocessor-args", + "ffmpeg:-movflags faststart", + "--abort-on-unavailable-fragment", + "--fragment-retries", + "1", + "--cookies", + "cookies.txt", + "--extractor-args", + "youtube:player_client=android", + "--load-info-json", + metadataPath, + "-fbestvideo[ext=mp4][vcodec!*=av01][height<=720]+bestaudio[ext!=webm][format_id!=258][format_id!=251][format_id!=256][format_id!=327]", + } + + ytdlCmd := exec.Command("yt-dlp", ytdlArgs...) + output, err := runCmd(ytdlCmd) + log.Debug(output) + return err +} + +func runCmd(cmd *exec.Cmd) ([]string, error) { + log.Infof("running cmd: %s", strings.Join(cmd.Args, " ")) + var err error + stderr, err := cmd.StderrPipe() + if err != nil { + return nil, err + } + stdout, err := cmd.StdoutPipe() + if err != nil { + return nil, err + } + err = cmd.Start() + if err != nil { + return nil, err + } + outLog, err := ioutil.ReadAll(stdout) + if err != nil { + return nil, err + } + errorLog, err := ioutil.ReadAll(stderr) + if err != nil { + return nil, err + } + done := make(chan error, 1) + go func() { + done <- cmd.Wait() + }() + + select { + case err := <-done: + if err != nil { + log.Error(string(errorLog)) + return nil, err + } + return strings.Split(strings.Replace(string(outLog), "\r\n", "\n", -1), "\n"), nil + } } -- 2.45.3 From 8ea15afce855a3bd9819ce68f5dcfc91939465d0 Mon Sep 17 00:00:00 2001 From: pseudoscalar <47653615+pseudoscalar@users.noreply.github.com> Date: Mon, 25 Oct 2021 16:20:12 -0500 Subject: [PATCH 2/7] Basic stream publishing. Still needs some work. --- local/local.go | 166 ++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 156 insertions(+), 10 deletions(-) diff --git a/local/local.go b/local/local.go index 4bb23e3..284d7ef 100644 --- a/local/local.go +++ b/local/local.go @@ -9,19 +9,27 @@ import ( "os" "os/exec" "path" + "regexp" + "sort" "strings" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "github.com/abadojack/whatlanggo" "github.com/lbryio/ytsync/v5/downloader/ytdl" + "github.com/lbryio/ytsync/v5/namer" + "github.com/lbryio/ytsync/v5/tags_manager" "github.com/lbryio/lbry.go/v2/extras/jsonrpc" + "github.com/lbryio/lbry.go/v2/extras/util" ) type SyncContext struct { TempDir string LbrynetAddr string + ChannelID string + PublishBid float64 } func (c *SyncContext) Validate() error { @@ -31,6 +39,12 @@ func (c *SyncContext) Validate() error { if c.LbrynetAddr == "" { return errors.New("No Lbrynet address provided") } + if c.ChannelID == "" { + return errors.New("No channel ID provided") + } + if c.PublishBid <= 0.0 { + return errors.New("Publish bid is not greater than zero") + } return nil } @@ -44,7 +58,9 @@ func AddCommand(rootCmd *cobra.Command) { Args: cobra.ExactArgs(1), } cmd.Flags().StringVar(&syncContext.TempDir, "temp-dir", getEnvDefault("TEMP_DIR", ""), "directory to use for temporary files") + cmd.Flags().Float64Var(&syncContext.PublishBid, "publish-bid", 0.01, "Bid amount for the stream claim") cmd.Flags().StringVar(&syncContext.LbrynetAddr, "lbrynet-address", getEnvDefault("LBRYNET_ADDRESS", ""), "JSONRPC address of the local LBRYNet daemon") + cmd.Flags().StringVar(&syncContext.ChannelID, "channel-id", "", "LBRY channel ID to publish to") rootCmd.AddCommand(cmd) } @@ -75,12 +91,25 @@ func localCmd(cmd *cobra.Command, args []string) { return } - fmt.Println(status.IsRunning) - fmt.Println(status.Wallet.Connected) + if !status.IsRunning { + log.Error("SDK is not running") + return + } + + // Should check to see if the SDK owns the channel + + // Should check to see if wallet is unlocked + // but jsonrpc.Client doesn't have WalletStatus method + // so skip for now + + // Should check to see if streams are configured to be reflected and warn if not + // but jsonrpc.Client doesn't have SettingsGet method to see if streams are reflected + // so use File.UploadingToReflector as a proxy for now + videoBasePath := path.Join(syncContext.TempDir, videoID) - _, videoMetadataPath, err := getVideoMetadata(videoBasePath, videoID) + videoMetadata, videoMetadataPath, err := getVideoMetadata(videoBasePath, videoID) if err != nil { log.Errorf("Error getting video metadata: %v", err) return @@ -89,6 +118,82 @@ func localCmd(cmd *cobra.Command, args []string) { err = downloadVideo(videoBasePath, videoMetadataPath) if err != nil { log.Errorf("Error downloading video: %v", err) + return + } + + tags, err := tags_manager.SanitizeTags(videoMetadata.Tags, syncContext.ChannelID) + if err != nil { + log.Errorf("Error sanitizing tags: %v", err) + return + } + + urlsRegex := regexp.MustCompile(`(?m) ?(f|ht)(tp)(s?)(://)(.*)[.|/](.*)`) + descriptionSample := urlsRegex.ReplaceAllString(videoMetadata.Description, "") + info := whatlanggo.Detect(descriptionSample) + info2 := whatlanggo.Detect(videoMetadata.Title) + var languages []string = nil + if info.IsReliable() && info.Lang.Iso6391() != "" { + language := info.Lang.Iso6391() + languages = []string{language} + } else if info2.IsReliable() && info2.Lang.Iso6391() != "" { + language := info2.Lang.Iso6391() + languages = []string{language} + } + // Thumbnail and ReleaseTime need to be properly determined + streamCreateOptions := jsonrpc.StreamCreateOptions { + ClaimCreateOptions: jsonrpc.ClaimCreateOptions { + Title: &videoMetadata.Title, + Description: util.PtrToString(getAbbrevDescription(videoMetadata)), + Languages: languages, + //ThumbnailURL: &v.thumbnailURL, + Tags: tags, + }, + ReleaseTime: util.PtrToInt64(time.Now().Unix()), + ChannelID: &syncContext.ChannelID, + License: util.PtrToString("Copyrighted (contact publisher)"), + } + + videoPath, err := getVideoDownloadedPath(syncContext.TempDir, videoID) + if err != nil { + log.Errorf("Error determining downloaded video path: %v", err) + } + + fmt.Println("%s", *streamCreateOptions.ClaimCreateOptions.Title) + fmt.Println("%s", *streamCreateOptions.ClaimCreateOptions.Description) + fmt.Println("%v", streamCreateOptions.ClaimCreateOptions.Languages) + fmt.Println("%v", streamCreateOptions.ClaimCreateOptions.Tags) + + claimName := namer.NewNamer().GetNextName(videoMetadata.Title) + log.Infof("Publishing stream as %s", claimName) + + txSummary, err := lbrynet.StreamCreate(claimName, videoPath, syncContext.PublishBid, streamCreateOptions) + if err != nil { + log.Errorf("Error creating stream: %v", err) + return + } + + for { + fileListResponse, fileIndex, err := findFileByTxid(lbrynet, txSummary.Txid) + if err != nil { + log.Errorf("Error finding file by txid: %v", err) + return + } + if fileListResponse == nil { + log.Errorf("Could not find file in list with correct txid") + return + } + + fileStatus := fileListResponse.Items[fileIndex] + if fileStatus.IsFullyReflected { + log.Info("Stream is fully reflected") + break + } + if !fileStatus.UploadingToReflector { + log.Warn("Stream is not being uploaded to a reflector. Check your lbrynet settings if this is a mistake.") + break + } + log.Infof("Stream reflector progress: %d%%", fileStatus.ReflectorProgress) + time.Sleep(5 * time.Second) } log.Info("Done") @@ -120,13 +225,7 @@ func getVideoMetadata(basePath, videoID string) (*ytdl.YtdlVideo, string, error) } func loadVideoMetadata(path string) (*ytdl.YtdlVideo, error) { - f, err := os.Open(path) - if err != nil { - return nil, err - } - defer f.Close() - - metadataBytes, err := ioutil.ReadAll(f) + metadataBytes, err := os.ReadFile(path) if err != nil { return nil, err } @@ -221,3 +320,50 @@ func runCmd(cmd *exec.Cmd) ([]string, error) { return strings.Split(strings.Replace(string(outLog), "\r\n", "\n", -1), "\n"), nil } } + +func getVideoDownloadedPath(videoDir, videoID string) (string, error) { + files, err := ioutil.ReadDir(videoDir) + if err != nil { + return "", err + } + + for _, f := range files { + if f.IsDir() { + continue + } + if path.Ext(f.Name()) == ".mp4" && strings.Contains(f.Name(), videoID) { + return path.Join(videoDir, f.Name()), nil + } + } + return "", errors.New("could not find any downloaded videos") + +} + +func getAbbrevDescription(v *ytdl.YtdlVideo) string { + maxLength := 2800 + description := strings.TrimSpace(v.Description) + additionalDescription := "\nhttps://www.youtube.com/watch?v=" + v.ID + if len(description) > maxLength { + description = description[:maxLength] + } + return description + "\n..." + additionalDescription +} + +// if jsonrpc.Client.FileList is extended to match the actual jsonrpc schema, this can be removed +func findFileByTxid(client *jsonrpc.Client, txid string) (*jsonrpc.FileListResponse, int, error) { + response, err := client.FileList(0, 20) + for { + if err != nil { + log.Errorf("Error getting file list page: %v", err) + return nil, 0, err + } + index := sort.Search(len(response.Items), func (i int) bool { return response.Items[i].Txid == txid }) + if index < len(response.Items) { + return response, index, nil + } + if response.Page >= response.TotalPages { + return nil, 0, nil + } + response, err = client.FileList(response.Page + 1, 20) + } +} -- 2.45.3 From 2ba960ae01e6ba080c33c20b8bded43c3b515514 Mon Sep 17 00:00:00 2001 From: pseudoscalar <47653615+pseudoscalar@users.noreply.github.com> Date: Tue, 2 Nov 2021 15:46:03 -0500 Subject: [PATCH 3/7] Refactor to support future direction of development --- local/local.go | 306 +++++++++++++------------------------ local/localSDKPublisher.go | 120 +++++++++++++++ local/ytdl.go | 202 ++++++++++++++++++++++++ local/ytdlVideoSource.go | 54 +++++++ 4 files changed, 481 insertions(+), 201 deletions(-) create mode 100644 local/localSDKPublisher.go create mode 100644 local/ytdl.go create mode 100644 local/ytdlVideoSource.go diff --git a/local/local.go b/local/local.go index 284d7ef..1efd1fa 100644 --- a/local/local.go +++ b/local/local.go @@ -5,12 +5,9 @@ import ( "errors" "fmt" "io/ioutil" - "time" "os" - "os/exec" "path" "regexp" - "sort" "strings" log "github.com/sirupsen/logrus" @@ -20,9 +17,6 @@ import ( "github.com/lbryio/ytsync/v5/downloader/ytdl" "github.com/lbryio/ytsync/v5/namer" "github.com/lbryio/ytsync/v5/tags_manager" - - "github.com/lbryio/lbry.go/v2/extras/jsonrpc" - "github.com/lbryio/lbry.go/v2/extras/util" ) type SyncContext struct { @@ -80,122 +74,45 @@ func localCmd(cmd *cobra.Command, args []string) { fmt.Println(syncContext.LbrynetAddr) videoID := args[0] - fmt.Println(videoID) - lbrynet := jsonrpc.NewClient(syncContext.LbrynetAddr) - lbrynet.SetRPCTimeout(5 * time.Minute) + log.Debugf("Running sync for YouTube video ID %s", videoID) - status, err := lbrynet.Status() + var publisher VideoPublisher + publisher, err = NewLocalSDKPublisher(syncContext.LbrynetAddr, syncContext.ChannelID, syncContext.PublishBid) if err != nil { - log.Error(err) + log.Errorf("Error setting up publisher: %v", err) return } - if !status.IsRunning { - log.Error("SDK is not running") + var videoSource VideoSource + videoSource, err = NewYtdlVideoSource(syncContext.TempDir) + if err != nil { + log.Errorf("Error setting up video source: %v", err) return } - // Should check to see if the SDK owns the channel - - // Should check to see if wallet is unlocked - // but jsonrpc.Client doesn't have WalletStatus method - // so skip for now - - // Should check to see if streams are configured to be reflected and warn if not - // but jsonrpc.Client doesn't have SettingsGet method to see if streams are reflected - // so use File.UploadingToReflector as a proxy for now - - - videoBasePath := path.Join(syncContext.TempDir, videoID) - - videoMetadata, videoMetadataPath, err := getVideoMetadata(videoBasePath, videoID) + sourceVideo, err := videoSource.GetVideo(videoID) if err != nil { - log.Errorf("Error getting video metadata: %v", err) + log.Errorf("Error getting source video: %v", err) return } - err = downloadVideo(videoBasePath, videoMetadataPath) + processedVideo, err := processVideoForPublishing(*sourceVideo, syncContext.ChannelID) if err != nil { - log.Errorf("Error downloading video: %v", err) + log.Errorf("Error processing source video for publishing: %v", err) return } - tags, err := tags_manager.SanitizeTags(videoMetadata.Tags, syncContext.ChannelID) + done, err := publisher.Publish(*processedVideo) if err != nil { - log.Errorf("Error sanitizing tags: %v", err) + log.Errorf("Error publishing video: %v", err) return } - urlsRegex := regexp.MustCompile(`(?m) ?(f|ht)(tp)(s?)(://)(.*)[.|/](.*)`) - descriptionSample := urlsRegex.ReplaceAllString(videoMetadata.Description, "") - info := whatlanggo.Detect(descriptionSample) - info2 := whatlanggo.Detect(videoMetadata.Title) - var languages []string = nil - if info.IsReliable() && info.Lang.Iso6391() != "" { - language := info.Lang.Iso6391() - languages = []string{language} - } else if info2.IsReliable() && info2.Lang.Iso6391() != "" { - language := info2.Lang.Iso6391() - languages = []string{language} - } - // Thumbnail and ReleaseTime need to be properly determined - streamCreateOptions := jsonrpc.StreamCreateOptions { - ClaimCreateOptions: jsonrpc.ClaimCreateOptions { - Title: &videoMetadata.Title, - Description: util.PtrToString(getAbbrevDescription(videoMetadata)), - Languages: languages, - //ThumbnailURL: &v.thumbnailURL, - Tags: tags, - }, - ReleaseTime: util.PtrToInt64(time.Now().Unix()), - ChannelID: &syncContext.ChannelID, - License: util.PtrToString("Copyrighted (contact publisher)"), - } - - videoPath, err := getVideoDownloadedPath(syncContext.TempDir, videoID) + err = <-done if err != nil { - log.Errorf("Error determining downloaded video path: %v", err) + log.Errorf("Error while wating for stream to reflect: %v", err) } - - fmt.Println("%s", *streamCreateOptions.ClaimCreateOptions.Title) - fmt.Println("%s", *streamCreateOptions.ClaimCreateOptions.Description) - fmt.Println("%v", streamCreateOptions.ClaimCreateOptions.Languages) - fmt.Println("%v", streamCreateOptions.ClaimCreateOptions.Tags) - - claimName := namer.NewNamer().GetNextName(videoMetadata.Title) - log.Infof("Publishing stream as %s", claimName) - - txSummary, err := lbrynet.StreamCreate(claimName, videoPath, syncContext.PublishBid, streamCreateOptions) - if err != nil { - log.Errorf("Error creating stream: %v", err) - return - } - - for { - fileListResponse, fileIndex, err := findFileByTxid(lbrynet, txSummary.Txid) - if err != nil { - log.Errorf("Error finding file by txid: %v", err) - return - } - if fileListResponse == nil { - log.Errorf("Could not find file in list with correct txid") - return - } - - fileStatus := fileListResponse.Items[fileIndex] - if fileStatus.IsFullyReflected { - log.Info("Stream is fully reflected") - break - } - if !fileStatus.UploadingToReflector { - log.Warn("Stream is not being uploaded to a reflector. Check your lbrynet settings if this is a mistake.") - break - } - log.Infof("Stream reflector progress: %d%%", fileStatus.ReflectorProgress) - time.Sleep(5 * time.Second) - } - log.Info("Done") } @@ -239,88 +156,6 @@ func loadVideoMetadata(path string) (*ytdl.YtdlVideo, error) { return videoMetadata, nil } -func downloadVideoMetadata(basePath, videoID string) error { - ytdlArgs := []string{ - "--skip-download", - "--write-info-json", - "--force-overwrites", - fmt.Sprintf("https://www.youtube.com/watch?v=%s", videoID), - "--cookies", - "cookies.txt", - "-o", - basePath, - } - ytdlCmd := exec.Command("yt-dlp", ytdlArgs...) - output, err := runCmd(ytdlCmd) - log.Debug(output) - return err -} - -func downloadVideo(basePath, metadataPath string) error { - ytdlArgs := []string{ - "--no-progress", - "-o", - basePath, - "--merge-output-format", - "mp4", - "--postprocessor-args", - "ffmpeg:-movflags faststart", - "--abort-on-unavailable-fragment", - "--fragment-retries", - "1", - "--cookies", - "cookies.txt", - "--extractor-args", - "youtube:player_client=android", - "--load-info-json", - metadataPath, - "-fbestvideo[ext=mp4][vcodec!*=av01][height<=720]+bestaudio[ext!=webm][format_id!=258][format_id!=251][format_id!=256][format_id!=327]", - } - - ytdlCmd := exec.Command("yt-dlp", ytdlArgs...) - output, err := runCmd(ytdlCmd) - log.Debug(output) - return err -} - -func runCmd(cmd *exec.Cmd) ([]string, error) { - log.Infof("running cmd: %s", strings.Join(cmd.Args, " ")) - var err error - stderr, err := cmd.StderrPipe() - if err != nil { - return nil, err - } - stdout, err := cmd.StdoutPipe() - if err != nil { - return nil, err - } - err = cmd.Start() - if err != nil { - return nil, err - } - outLog, err := ioutil.ReadAll(stdout) - if err != nil { - return nil, err - } - errorLog, err := ioutil.ReadAll(stderr) - if err != nil { - return nil, err - } - done := make(chan error, 1) - go func() { - done <- cmd.Wait() - }() - - select { - case err := <-done: - if err != nil { - log.Error(string(errorLog)) - return nil, err - } - return strings.Split(strings.Replace(string(outLog), "\r\n", "\n", -1), "\n"), nil - } -} - func getVideoDownloadedPath(videoDir, videoID string) (string, error) { files, err := ioutil.ReadDir(videoDir) if err != nil { @@ -339,31 +174,100 @@ func getVideoDownloadedPath(videoDir, videoID string) (string, error) { } -func getAbbrevDescription(v *ytdl.YtdlVideo) string { +func getAbbrevDescription(v SourceVideo) string { + if v.Description == nil { + return v.SourceURL + } + maxLength := 2800 - description := strings.TrimSpace(v.Description) - additionalDescription := "\nhttps://www.youtube.com/watch?v=" + v.ID + description := strings.TrimSpace(*v.Description) + additionalDescription := "\n" + v.SourceURL if len(description) > maxLength { description = description[:maxLength] } return description + "\n..." + additionalDescription } -// if jsonrpc.Client.FileList is extended to match the actual jsonrpc schema, this can be removed -func findFileByTxid(client *jsonrpc.Client, txid string) (*jsonrpc.FileListResponse, int, error) { - response, err := client.FileList(0, 20) - for { - if err != nil { - log.Errorf("Error getting file list page: %v", err) - return nil, 0, err - } - index := sort.Search(len(response.Items), func (i int) bool { return response.Items[i].Txid == txid }) - if index < len(response.Items) { - return response, index, nil - } - if response.Page >= response.TotalPages { - return nil, 0, nil - } - response, err = client.FileList(response.Page + 1, 20) - } +type SourceVideo struct { + ID string + Title *string + Description *string + SourceURL string + Languages []string + Tags []string + ReleaseTime *int64 + ThumbnailURL *string + FullLocalPath string +} + +type PublishableVideo struct { + ID string + ClaimName string + Title string + Description string + SourceURL string + Languages []string + Tags []string + ReleaseTime int64 + ThumbnailURL string + FullLocalPath string +} + +func processVideoForPublishing(source SourceVideo, channelID string) (*PublishableVideo, error) { + tags, err := tags_manager.SanitizeTags(source.Tags, channelID) + if err != nil { + log.Errorf("Error sanitizing tags: %v", err) + return nil, err + } + + descriptionSample := "" + if source.Description != nil { + urlsRegex := regexp.MustCompile(`(?m) ?(f|ht)(tp)(s?)(://)(.*)[.|/](.*)`) + descriptionSample = urlsRegex.ReplaceAllString(*source.Description, "") + } + info := whatlanggo.Detect(descriptionSample) + + title := "" + if source.Title != nil { + title = *source.Title + } + info2 := whatlanggo.Detect(title) + var languages []string = nil + if info.IsReliable() && info.Lang.Iso6391() != "" { + language := info.Lang.Iso6391() + languages = []string{language} + } else if info2.IsReliable() && info2.Lang.Iso6391() != "" { + language := info2.Lang.Iso6391() + languages = []string{language} + } + + claimName := namer.NewNamer().GetNextName(title) + + thumbnailURL := "" + if source.ThumbnailURL != nil { + thumbnailURL = *source.ThumbnailURL + } + + processed := PublishableVideo { + ClaimName: claimName, + Title: title, + Description: getAbbrevDescription(source), + Languages: languages, + Tags: tags, + ReleaseTime: *source.ReleaseTime, + ThumbnailURL: thumbnailURL, + FullLocalPath: source.FullLocalPath, + } + + log.Debugf("Video prepared for publication: %v", processed) + + return &processed, nil +} + +type VideoSource interface { + GetVideo(id string) (*SourceVideo, error) +} + +type VideoPublisher interface { + Publish(video PublishableVideo) (chan error, error) } diff --git a/local/localSDKPublisher.go b/local/localSDKPublisher.go new file mode 100644 index 0000000..4e99355 --- /dev/null +++ b/local/localSDKPublisher.go @@ -0,0 +1,120 @@ +package local + +import ( + "errors" + "sort" + "time" + + log "github.com/sirupsen/logrus" + + "github.com/lbryio/lbry.go/v2/extras/jsonrpc" + "github.com/lbryio/lbry.go/v2/extras/util" +) + +type LocalSDKPublisher struct { + channelID string + publishBid float64 + lbrynet *jsonrpc.Client +} + +func NewLocalSDKPublisher(sdkAddr, channelID string, publishBid float64) (*LocalSDKPublisher, error) { + lbrynet := jsonrpc.NewClient(sdkAddr) + lbrynet.SetRPCTimeout(5 * time.Minute) + + status, err := lbrynet.Status() + if err != nil { + return nil, err + } + + if !status.IsRunning { + return nil, errors.New("SDK is not running") + } + + // Should check to see if the SDK owns the channel + + // Should check to see if wallet is unlocked + // but jsonrpc.Client doesn't have WalletStatus method + // so skip for now + + // Should check to see if streams are configured to be reflected and warn if not + // but jsonrpc.Client doesn't have SettingsGet method to see if streams are reflected + // so use File.UploadingToReflector as a proxy for now + + publisher := LocalSDKPublisher { + channelID: channelID, + publishBid: publishBid, + lbrynet: lbrynet, + } + return &publisher, nil +} + +func (p *LocalSDKPublisher) Publish(video PublishableVideo) (chan error, error) { + streamCreateOptions := jsonrpc.StreamCreateOptions { + ClaimCreateOptions: jsonrpc.ClaimCreateOptions { + Title: &video.Title, + Description: &video.Description, + Languages: video.Languages, + ThumbnailURL: &video.ThumbnailURL, + Tags: video.Tags, + }, + ReleaseTime: &video.ReleaseTime, + ChannelID: &p.channelID, + License: util.PtrToString("Copyrighted (contact publisher)"), + } + + txSummary, err := p.lbrynet.StreamCreate(video.ClaimName, video.FullLocalPath, p.publishBid, streamCreateOptions) + if err != nil { + return nil, err + } + + done := make(chan error, 1) + go func() { + for { + fileListResponse, fileIndex, err := findFileByTxid(p.lbrynet, txSummary.Txid) + if err != nil { + log.Errorf("Error finding file by txid: %v", err) + done <- err + return + } + if fileListResponse == nil { + log.Errorf("Could not find file in list with correct txid") + done <- err + return + } + + fileStatus := fileListResponse.Items[fileIndex] + if fileStatus.IsFullyReflected { + log.Info("Stream is fully reflected") + break + } + if !fileStatus.UploadingToReflector { + log.Warn("Stream is not being uploaded to a reflector. Check your lbrynet settings if this is a mistake.") + break + } + log.Infof("Stream reflector progress: %d%%", fileStatus.ReflectorProgress) + time.Sleep(5 * time.Second) + } + done <- nil + }() + + return done, nil +} + +// if jsonrpc.Client.FileList is extended to match the actual jsonrpc schema, this can be removed +func findFileByTxid(client *jsonrpc.Client, txid string) (*jsonrpc.FileListResponse, int, error) { + response, err := client.FileList(0, 20) + for { + if err != nil { + log.Errorf("Error getting file list page: %v", err) + return nil, 0, err + } + index := sort.Search(len(response.Items), func (i int) bool { return response.Items[i].Txid == txid }) + if index < len(response.Items) { + return response, index, nil + } + if response.Page >= response.TotalPages { + return nil, 0, nil + } + response, err = client.FileList(response.Page + 1, 20) + } +} diff --git a/local/ytdl.go b/local/ytdl.go new file mode 100644 index 0000000..712aa44 --- /dev/null +++ b/local/ytdl.go @@ -0,0 +1,202 @@ +package local + +import ( + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "os" + "os/exec" + "path" + "strings" + + log "github.com/sirupsen/logrus" + + "github.com/lbryio/ytsync/v5/downloader/ytdl" +) + +type Ytdl struct { + DownloadDir string +} + +func NewYtdl(downloadDir string) (*Ytdl, error) { + // TODO validate download dir + + y := Ytdl { + DownloadDir: downloadDir, + } + + return &y, nil +} + +func (y *Ytdl) GetVideoMetadata(videoID string) (*ytdl.YtdlVideo, error) { + metadataPath, err := y.GetVideoMetadataFile(videoID) + if err != nil { + return nil, err + } + + metadataBytes, err := os.ReadFile(metadataPath) + if err != nil { + return nil, err + } + + var metadata *ytdl.YtdlVideo + err = json.Unmarshal(metadataBytes, &metadata) + if err != nil { + return nil, err + } + + return metadata, nil +} + +func (y *Ytdl) GetVideoMetadataFile(videoID string) (string, error) { + basePath := path.Join(y.DownloadDir, videoID) + metadataPath := basePath + ".info.json" + + _, err := os.Stat(metadataPath) + if err != nil && !os.IsNotExist(err) { + log.Errorf("Error determining if video metadata already exists: %v", err) + return "", err + } else if err != nil { + log.Debugf("Metadata file for video %s does not exist. Downloading now.", videoID) + err = downloadVideoMetadata(basePath, videoID) + if err != nil { + return "", err + } + } + + return metadataPath, nil +} + +func (y *Ytdl) GetVideoFile(videoID string) (string, error) { + videoPath, err := findDownloadedVideo(y.DownloadDir, videoID) + if err != nil { + return "", err + } + + if videoPath != nil { + return *videoPath, nil + } + + basePath := path.Join(y.DownloadDir, videoID) + metadataPath, err := y.GetVideoMetadataFile(videoID) + if err != nil { + log.Errorf("Error getting metadata path in preparation for video download: %v", err) + return "", err + } + err = downloadVideo(basePath, metadataPath) + if err != nil { + return "", nil + } + + videoPath, err = findDownloadedVideo(y.DownloadDir, videoID) + if err != nil { + log.Errorf("Error from findDownloadedVideo() after already succeeding once: %v", err) + return "", err + } + if videoPath == nil { + return "", errors.New("Could not find a downloaded video after successful download.") + } + + return *videoPath, nil +} + +func findDownloadedVideo(videoDir, videoID string) (*string, error) { + files, err := ioutil.ReadDir(videoDir) + if err != nil { + return nil, err + } + + for _, f := range files { + if f.IsDir() { + continue + } + if path.Ext(f.Name()) == ".mp4" && strings.Contains(f.Name(), videoID) { + videoPath := path.Join(videoDir, f.Name()) + return &videoPath, nil + } + } + return nil, nil +} + +func downloadVideoMetadata(basePath, videoID string) error { + ytdlArgs := []string{ + "--skip-download", + "--write-info-json", + "--force-overwrites", + fmt.Sprintf("https://www.youtube.com/watch?v=%s", videoID), + "--cookies", + "cookies.txt", + "-o", + basePath, + } + ytdlCmd := exec.Command("yt-dlp", ytdlArgs...) + output, err := runCmd(ytdlCmd) + log.Debug(output) + return err +} + +func downloadVideo(basePath, metadataPath string) error { + ytdlArgs := []string{ + "--no-progress", + "-o", + basePath, + "--merge-output-format", + "mp4", + "--postprocessor-args", + "ffmpeg:-movflags faststart", + "--abort-on-unavailable-fragment", + "--fragment-retries", + "1", + "--cookies", + "cookies.txt", + "--extractor-args", + "youtube:player_client=android", + "--load-info-json", + metadataPath, + "-fbestvideo[ext=mp4][vcodec!*=av01][height<=720]+bestaudio[ext!=webm][format_id!=258][format_id!=251][format_id!=256][format_id!=327]", + } + + ytdlCmd := exec.Command("yt-dlp", ytdlArgs...) + output, err := runCmd(ytdlCmd) + log.Debug(output) + return err +} + +func runCmd(cmd *exec.Cmd) ([]string, error) { + log.Infof("running cmd: %s", strings.Join(cmd.Args, " ")) + var err error + stderr, err := cmd.StderrPipe() + if err != nil { + return nil, err + } + stdout, err := cmd.StdoutPipe() + if err != nil { + return nil, err + } + err = cmd.Start() + if err != nil { + return nil, err + } + outLog, err := ioutil.ReadAll(stdout) + if err != nil { + return nil, err + } + errorLog, err := ioutil.ReadAll(stderr) + if err != nil { + return nil, err + } + done := make(chan error, 1) + go func() { + done <- cmd.Wait() + }() + + select { + case err := <-done: + if err != nil { + log.Error(string(errorLog)) + return nil, err + } + return strings.Split(strings.Replace(string(outLog), "\r\n", "\n", -1), "\n"), nil + } +} diff --git a/local/ytdlVideoSource.go b/local/ytdlVideoSource.go new file mode 100644 index 0000000..bc26884 --- /dev/null +++ b/local/ytdlVideoSource.go @@ -0,0 +1,54 @@ +package local + +import ( + "time" + + log "github.com/sirupsen/logrus" + + "github.com/lbryio/lbry.go/v2/extras/util" +) + +type YtdlVideoSource struct { + downloader Ytdl +} + +func NewYtdlVideoSource(downloadDir string) (*YtdlVideoSource, error) { + ytdl, err := NewYtdl(downloadDir) + if err != nil { + return nil, err + } + + source := YtdlVideoSource { + downloader: *ytdl, + } + + return &source, nil +} + +func (s *YtdlVideoSource) GetVideo(id string) (*SourceVideo, error) { + metadata, err := s.downloader.GetVideoMetadata(id) + if err != nil { + return nil, err + } + + videoPath, err := s.downloader.GetVideoFile(id) + if err != nil { + return nil, err + } + + sourceVideo := SourceVideo { + ID: id, + Title: &metadata.Title, + Description: &metadata.Description, + SourceURL: "\nhttps://www.youtube.com/watch?v=" + id, + Languages: []string{}, + Tags: metadata.Tags, + ReleaseTime: util.PtrToInt64(time.Now().Unix()), + ThumbnailURL: nil, + FullLocalPath: videoPath, + } + + log.Debugf("Source video retrieved via ytdl: %v", sourceVideo) + + return &sourceVideo, nil +} -- 2.45.3 From eb30fa4299ee9d2ecb8495312f002eafc61af15f Mon Sep 17 00:00:00 2001 From: pseudoscalar <47653615+pseudoscalar@users.noreply.github.com> Date: Thu, 4 Nov 2021 15:28:28 -0500 Subject: [PATCH 4/7] Determine release time via YouTube API --- local/local.go | 131 +++++++++++++-------------------------- local/youtubeEnricher.go | 45 ++++++++++++++ local/ytapi.go | 83 +++++++++++++++++++++++++ local/ytdlVideoSource.go | 30 +++++++-- 4 files changed, 194 insertions(+), 95 deletions(-) create mode 100644 local/youtubeEnricher.go create mode 100644 local/ytapi.go diff --git a/local/local.go b/local/local.go index 1efd1fa..61f25f3 100644 --- a/local/local.go +++ b/local/local.go @@ -1,20 +1,17 @@ package local import ( - "encoding/json" "errors" - "fmt" - "io/ioutil" "os" - "path" "regexp" "strings" + "time" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/abadojack/whatlanggo" - "github.com/lbryio/ytsync/v5/downloader/ytdl" + "github.com/lbryio/lbry.go/v2/extras/util" "github.com/lbryio/ytsync/v5/namer" "github.com/lbryio/ytsync/v5/tags_manager" ) @@ -24,6 +21,7 @@ type SyncContext struct { LbrynetAddr string ChannelID string PublishBid float64 + YouTubeSourceConfig *YouTubeSourceConfig } func (c *SyncContext) Validate() error { @@ -42,6 +40,10 @@ func (c *SyncContext) Validate() error { return nil } +type YouTubeSourceConfig struct { + YouTubeAPIKey string +} + var syncContext SyncContext func AddCommand(rootCmd *cobra.Command) { @@ -55,6 +57,10 @@ func AddCommand(rootCmd *cobra.Command) { cmd.Flags().Float64Var(&syncContext.PublishBid, "publish-bid", 0.01, "Bid amount for the stream claim") cmd.Flags().StringVar(&syncContext.LbrynetAddr, "lbrynet-address", getEnvDefault("LBRYNET_ADDRESS", ""), "JSONRPC address of the local LBRYNet daemon") cmd.Flags().StringVar(&syncContext.ChannelID, "channel-id", "", "LBRY channel ID to publish to") + + // For now, assume source is always YouTube + syncContext.YouTubeSourceConfig = &YouTubeSourceConfig{} + cmd.Flags().StringVar(&syncContext.YouTubeSourceConfig.YouTubeAPIKey, "youtube-api-key", getEnvDefault("YOUTUBE_API_KEY", ""), "YouTube API Key") rootCmd.AddCommand(cmd) } @@ -71,11 +77,9 @@ func localCmd(cmd *cobra.Command, args []string) { log.Error(err) return } - fmt.Println(syncContext.LbrynetAddr) - videoID := args[0] - log.Debugf("Running sync for YouTube video ID %s", videoID) + log.Debugf("Running sync for video ID %s", videoID) var publisher VideoPublisher publisher, err = NewLocalSDKPublisher(syncContext.LbrynetAddr, syncContext.ChannelID, syncContext.PublishBid) @@ -85,10 +89,12 @@ func localCmd(cmd *cobra.Command, args []string) { } var videoSource VideoSource - videoSource, err = NewYtdlVideoSource(syncContext.TempDir) - if err != nil { - log.Errorf("Error setting up video source: %v", err) - return + if syncContext.YouTubeSourceConfig != nil { + videoSource, err = NewYtdlVideoSource(syncContext.TempDir, syncContext.YouTubeSourceConfig) + if err != nil { + log.Errorf("Error setting up video source: %v", err) + return + } } sourceVideo, err := videoSource.GetVideo(videoID) @@ -116,78 +122,6 @@ func localCmd(cmd *cobra.Command, args []string) { log.Info("Done") } -func getVideoMetadata(basePath, videoID string) (*ytdl.YtdlVideo, string, error) { - metadataPath := basePath + ".info.json" - - _, err := os.Stat(metadataPath) - if err != nil && !os.IsNotExist(err) { - log.Errorf("Error determining if video metadata already exists: %v", err) - return nil, "", err - } else if err == nil { - log.Debugf("Video metadata file %s already exists. Attempting to load existing file.", metadataPath) - videoMetadata, err := loadVideoMetadata(metadataPath) - if err != nil { - log.Debugf("Error loading pre-existing video metadata: %v. Deleting file and attempting re-download.", err) - } else { - return videoMetadata, metadataPath, nil - } - } - - if err := downloadVideoMetadata(basePath, videoID); err != nil { - return nil, "", err - } - - videoMetadata, err := loadVideoMetadata(metadataPath) - return videoMetadata, metadataPath, err -} - -func loadVideoMetadata(path string) (*ytdl.YtdlVideo, error) { - metadataBytes, err := os.ReadFile(path) - if err != nil { - return nil, err - } - - var videoMetadata *ytdl.YtdlVideo - err = json.Unmarshal(metadataBytes, &videoMetadata) - if err != nil { - return nil, err - } - - return videoMetadata, nil -} - -func getVideoDownloadedPath(videoDir, videoID string) (string, error) { - files, err := ioutil.ReadDir(videoDir) - if err != nil { - return "", err - } - - for _, f := range files { - if f.IsDir() { - continue - } - if path.Ext(f.Name()) == ".mp4" && strings.Contains(f.Name(), videoID) { - return path.Join(videoDir, f.Name()), nil - } - } - return "", errors.New("could not find any downloaded videos") - -} - -func getAbbrevDescription(v SourceVideo) string { - if v.Description == nil { - return v.SourceURL - } - - maxLength := 2800 - description := strings.TrimSpace(*v.Description) - additionalDescription := "\n" + v.SourceURL - if len(description) > maxLength { - description = description[:maxLength] - } - return description + "\n..." + additionalDescription -} - type SourceVideo struct { ID string Title *string @@ -243,9 +177,14 @@ func processVideoForPublishing(source SourceVideo, channelID string) (*Publishab claimName := namer.NewNamer().GetNextName(title) - thumbnailURL := "" - if source.ThumbnailURL != nil { - thumbnailURL = *source.ThumbnailURL + thumbnailURL := source.ThumbnailURL + if thumbnailURL == nil { + thumbnailURL = util.PtrToString("") + } + + releaseTime := source.ReleaseTime + if releaseTime == nil { + releaseTime = util.PtrToInt64(time.Now().Unix()) } processed := PublishableVideo { @@ -254,8 +193,8 @@ func processVideoForPublishing(source SourceVideo, channelID string) (*Publishab Description: getAbbrevDescription(source), Languages: languages, Tags: tags, - ReleaseTime: *source.ReleaseTime, - ThumbnailURL: thumbnailURL, + ReleaseTime: *releaseTime, + ThumbnailURL: *thumbnailURL, FullLocalPath: source.FullLocalPath, } @@ -264,6 +203,20 @@ func processVideoForPublishing(source SourceVideo, channelID string) (*Publishab return &processed, nil } +func getAbbrevDescription(v SourceVideo) string { + if v.Description == nil { + return v.SourceURL + } + + maxLength := 2800 + description := strings.TrimSpace(*v.Description) + additionalDescription := "\n" + v.SourceURL + if len(description) > maxLength { + description = description[:maxLength] + } + return description + "\n..." + additionalDescription +} + type VideoSource interface { GetVideo(id string) (*SourceVideo, error) } diff --git a/local/youtubeEnricher.go b/local/youtubeEnricher.go new file mode 100644 index 0000000..38369b5 --- /dev/null +++ b/local/youtubeEnricher.go @@ -0,0 +1,45 @@ +package local + +import ( + "time" + + log "github.com/sirupsen/logrus" + + "github.com/lbryio/lbry.go/v2/extras/util" +) + +type YouTubeVideoEnricher interface { + EnrichMissing(source *SourceVideo) error +} + +type YouTubeAPIVideoEnricher struct { + api *YouTubeAPI +} + +func NewYouTubeAPIVideoEnricher(apiKey string) (*YouTubeAPIVideoEnricher) { + enricher := YouTubeAPIVideoEnricher{ + api: NewYouTubeAPI(apiKey), + } + return &enricher +} + +func (e *YouTubeAPIVideoEnricher) EnrichMissing(source *SourceVideo) error { + if source.ReleaseTime != nil { + log.Debugf("Video %s does not need enrichment. YouTubeAPIVideoEnricher is skipping.", source.ID) + return nil + } + + snippet, err := e.api.GetVideoSnippet(source.ID) + if err != nil { + log.Errorf("Error snippet data for video %s: %v", err) + return err + } + + publishedAt, err := time.Parse(time.RFC3339, snippet.PublishedAt) + if err != nil { + log.Errorf("Error converting publishedAt to timestamp: %v", err) + } else { + source.ReleaseTime = util.PtrToInt64(publishedAt.Unix()) + } + return nil +} diff --git a/local/ytapi.go b/local/ytapi.go new file mode 100644 index 0000000..1924446 --- /dev/null +++ b/local/ytapi.go @@ -0,0 +1,83 @@ +package local + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + log "github.com/sirupsen/logrus" +) + +type YouTubeAPI struct { + apiKey string + client *http.Client +} + +func NewYouTubeAPI(apiKey string) (*YouTubeAPI) { + client := &http.Client { + Transport: &http.Transport{ + MaxIdleConns: 10, + IdleConnTimeout: 30 * time.Second, + DisableCompression: true, + }, + } + + api := YouTubeAPI { + apiKey: apiKey, + client: client, + } + + return &api +} + +func (a *YouTubeAPI) GetVideoSnippet(videoID string) (*VideoSnippet, error) { + req, err := http.NewRequest("GET", "https://youtube.googleapis.com/youtube/v3/videos", nil) + if err != nil { + log.Errorf("Error creating http client for YouTube API: %v", err) + return nil, err + } + + query := req.URL.Query() + query.Add("part", "snippet") + query.Add("id", videoID) + query.Add("key", a.apiKey) + req.URL.RawQuery = query.Encode() + + req.Header.Add("Accept", "application/json") + + resp, err := a.client.Do(req) + defer resp.Body.Close() + if err != nil { + log.Errorf("Error from YouTube API: %v", err) + return nil, err + } + + body, err := io.ReadAll(resp.Body) + log.Tracef("Response from YouTube API: %s", string(body[:])) + + var result videoListResponse + err = json.Unmarshal(body, &result) + if err != nil { + log.Errorf("Error deserializing video list response from YouTube API: %v", err) + return nil, err + } + + if len(result.Items) != 1 { + err = fmt.Errorf("YouTube API responded with incorrect number of snippets (%d) while attempting to get snippet data for video %s", len(result.Items), videoID) + return nil, err + } + + return &result.Items[0].Snippet, nil +} + +type videoListResponse struct { + Items []struct { + Snippet VideoSnippet `json:"snippet"` + } `json:"items"` +} + +type VideoSnippet struct { + PublishedAt string `json:"publishedAt"` +} diff --git a/local/ytdlVideoSource.go b/local/ytdlVideoSource.go index bc26884..a01b371 100644 --- a/local/ytdlVideoSource.go +++ b/local/ytdlVideoSource.go @@ -1,18 +1,17 @@ package local import ( - "time" - log "github.com/sirupsen/logrus" - "github.com/lbryio/lbry.go/v2/extras/util" + "github.com/lbryio/ytsync/v5/downloader/ytdl" ) type YtdlVideoSource struct { downloader Ytdl + enrichers []YouTubeVideoEnricher } -func NewYtdlVideoSource(downloadDir string) (*YtdlVideoSource, error) { +func NewYtdlVideoSource(downloadDir string, config *YouTubeSourceConfig) (*YtdlVideoSource, error) { ytdl, err := NewYtdl(downloadDir) if err != nil { return nil, err @@ -22,6 +21,11 @@ func NewYtdlVideoSource(downloadDir string) (*YtdlVideoSource, error) { downloader: *ytdl, } + if config.YouTubeAPIKey != "" { + ytapiEnricher := NewYouTubeAPIVideoEnricher(config.YouTubeAPIKey) + source.enrichers = append(source.enrichers, ytapiEnricher) + } + return &source, nil } @@ -36,6 +40,13 @@ func (s *YtdlVideoSource) GetVideo(id string) (*SourceVideo, error) { return nil, err } + var bestThumbnail *ytdl.Thumbnail = nil + for i, thumbnail := range metadata.Thumbnails { + if i == 0 || bestThumbnail.Width < thumbnail.Width { + bestThumbnail = &thumbnail + } + } + sourceVideo := SourceVideo { ID: id, Title: &metadata.Title, @@ -43,11 +54,18 @@ func (s *YtdlVideoSource) GetVideo(id string) (*SourceVideo, error) { SourceURL: "\nhttps://www.youtube.com/watch?v=" + id, Languages: []string{}, Tags: metadata.Tags, - ReleaseTime: util.PtrToInt64(time.Now().Unix()), - ThumbnailURL: nil, + ReleaseTime: nil, + ThumbnailURL: &bestThumbnail.URL, FullLocalPath: videoPath, } + for _, enricher := range s.enrichers { + err = enricher.EnrichMissing(&sourceVideo) + if err != nil { + log.Warnf("Error enriching video %s, continuing enrichment: %v", id, err) + } + } + log.Debugf("Source video retrieved via ytdl: %v", sourceVideo) return &sourceVideo, nil -- 2.45.3 From e564dc844548dd521b9880a52f4ed4063268b609 Mon Sep 17 00:00:00 2001 From: pseudoscalar <47653615+pseudoscalar@users.noreply.github.com> Date: Fri, 5 Nov 2021 10:09:59 -0500 Subject: [PATCH 5/7] Add dry-run option --- local/local.go | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/local/local.go b/local/local.go index 61f25f3..fed260b 100644 --- a/local/local.go +++ b/local/local.go @@ -17,6 +17,7 @@ import ( ) type SyncContext struct { + DryRun bool TempDir string LbrynetAddr string ChannelID string @@ -53,6 +54,7 @@ func AddCommand(rootCmd *cobra.Command) { Run: localCmd, Args: cobra.ExactArgs(1), } + cmd.Flags().BoolVar(&syncContext.DryRun, "dry-run", false, "Display information about the stream publishing, but do not publish the stream") cmd.Flags().StringVar(&syncContext.TempDir, "temp-dir", getEnvDefault("TEMP_DIR", ""), "directory to use for temporary files") cmd.Flags().Float64Var(&syncContext.PublishBid, "publish-bid", 0.01, "Bid amount for the stream claim") cmd.Flags().StringVar(&syncContext.LbrynetAddr, "lbrynet-address", getEnvDefault("LBRYNET_ADDRESS", ""), "JSONRPC address of the local LBRYNet daemon") @@ -109,15 +111,22 @@ func localCmd(cmd *cobra.Command, args []string) { return } - done, err := publisher.Publish(*processedVideo) - if err != nil { - log.Errorf("Error publishing video: %v", err) - return - } + if syncContext.DryRun { + log.Infoln("This is a dry run. Nothing will be published.") + log.Infof("The local file %s would be published to channel ID %s as %s.", processedVideo.FullLocalPath, syncContext.ChannelID, processedVideo.ClaimName) + log.Debugf("Object to be published: %v", processedVideo) - err = <-done - if err != nil { - log.Errorf("Error while wating for stream to reflect: %v", err) + } else { + done, err := publisher.Publish(*processedVideo) + if err != nil { + log.Errorf("Error publishing video: %v", err) + return + } + + err = <-done + if err != nil { + log.Errorf("Error while wating for stream to reflect: %v", err) + } } log.Info("Done") } -- 2.45.3 From 9f6b15e84122ae79a3b5528d7db182b5f5aaceba Mon Sep 17 00:00:00 2001 From: pseudoscalar <47653615+pseudoscalar@users.noreply.github.com> Date: Fri, 5 Nov 2021 11:22:20 -0500 Subject: [PATCH 6/7] Clean up local cache after publishing stream --- local/local.go | 11 +++++++++++ local/ytdl.go | 37 +++++++++++++++++++++++++++++++++++++ local/ytdlVideoSource.go | 4 ++++ 3 files changed, 52 insertions(+) diff --git a/local/local.go b/local/local.go index fed260b..abf3793 100644 --- a/local/local.go +++ b/local/local.go @@ -18,6 +18,7 @@ import ( type SyncContext struct { DryRun bool + KeepCache bool TempDir string LbrynetAddr string ChannelID string @@ -55,6 +56,7 @@ func AddCommand(rootCmd *cobra.Command) { Args: cobra.ExactArgs(1), } cmd.Flags().BoolVar(&syncContext.DryRun, "dry-run", false, "Display information about the stream publishing, but do not publish the stream") + cmd.Flags().BoolVar(&syncContext.KeepCache, "keep-cache", false, "Don't delete local files after publishing.") cmd.Flags().StringVar(&syncContext.TempDir, "temp-dir", getEnvDefault("TEMP_DIR", ""), "directory to use for temporary files") cmd.Flags().Float64Var(&syncContext.PublishBid, "publish-bid", 0.01, "Bid amount for the stream claim") cmd.Flags().StringVar(&syncContext.LbrynetAddr, "lbrynet-address", getEnvDefault("LBRYNET_ADDRESS", ""), "JSONRPC address of the local LBRYNet daemon") @@ -128,6 +130,14 @@ func localCmd(cmd *cobra.Command, args []string) { log.Errorf("Error while wating for stream to reflect: %v", err) } } + + if !syncContext.KeepCache { + log.Infof("Deleting local files.") + err = videoSource.DeleteLocalCache(videoID) + if err != nil { + log.Errorf("Error deleting local files for video %s: %v", videoID, err) + } + } log.Info("Done") } @@ -228,6 +238,7 @@ func getAbbrevDescription(v SourceVideo) string { type VideoSource interface { GetVideo(id string) (*SourceVideo, error) + DeleteLocalCache(id string) error } type VideoPublisher interface { diff --git a/local/ytdl.go b/local/ytdl.go index 712aa44..7baa893 100644 --- a/local/ytdl.go +++ b/local/ytdl.go @@ -49,6 +49,7 @@ func (y *Ytdl) GetVideoMetadata(videoID string) (*ytdl.YtdlVideo, error) { return metadata, nil } + func (y *Ytdl) GetVideoMetadataFile(videoID string) (string, error) { basePath := path.Join(y.DownloadDir, videoID) metadataPath := basePath + ".info.json" @@ -101,6 +102,42 @@ func (y *Ytdl) GetVideoFile(videoID string) (string, error) { return *videoPath, nil } +func (y *Ytdl) DeleteVideoFiles(videoID string) error { + files, err := ioutil.ReadDir(y.DownloadDir) + if err != nil { + return err + } + + for _, f := range files { + if f.IsDir() { + continue + } + if strings.Contains(f.Name(), videoID) { + videoPath := path.Join(y.DownloadDir, f.Name()) + err = os.Remove(videoPath) + if err != nil { + log.Errorf("Error while deleting file %s: %v", y.DownloadDir, err) + return err + } + } + } + + return nil +} + +func deleteFile(path string) error { + _, err := os.Stat(path) + if err != nil && !os.IsNotExist(err) { + log.Errorf("Error determining if file %s exists: %v", path, err) + return err + } else if err != nil { + log.Debugf("File %s does not exist. Skipping deletion.", path) + return nil + } + + return os.Remove(path) +} + func findDownloadedVideo(videoDir, videoID string) (*string, error) { files, err := ioutil.ReadDir(videoDir) if err != nil { diff --git a/local/ytdlVideoSource.go b/local/ytdlVideoSource.go index a01b371..b68c18f 100644 --- a/local/ytdlVideoSource.go +++ b/local/ytdlVideoSource.go @@ -70,3 +70,7 @@ func (s *YtdlVideoSource) GetVideo(id string) (*SourceVideo, error) { return &sourceVideo, nil } + +func (s *YtdlVideoSource) DeleteLocalCache(id string) error { + return s.downloader.DeleteVideoFiles(id) +} -- 2.45.3 From ce901f6b01dca938c381d7730dea64caeeb6c579 Mon Sep 17 00:00:00 2001 From: pseudoscalar <47653615+pseudoscalar@users.noreply.github.com> Date: Wed, 17 Nov 2021 10:07:20 -0600 Subject: [PATCH 7/7] Add option to not wait for reflection. Add instructions for getting a YouTube API key. Fix some minor issues. --- local/local.go | 35 +++++++++++++++++++++-------------- local/localSDKPublisher.go | 11 ++++++++--- local/readme.md | 18 +++++++++++++++++- 3 files changed, 46 insertions(+), 18 deletions(-) diff --git a/local/local.go b/local/local.go index abf3793..9b7837e 100644 --- a/local/local.go +++ b/local/local.go @@ -17,12 +17,13 @@ import ( ) type SyncContext struct { - DryRun bool - KeepCache bool - TempDir string - LbrynetAddr string - ChannelID string - PublishBid float64 + DryRun bool + KeepCache bool + ReflectStreams bool + TempDir string + LbrynetAddr string + ChannelID string + PublishBid float64 YouTubeSourceConfig *YouTubeSourceConfig } @@ -57,6 +58,7 @@ func AddCommand(rootCmd *cobra.Command) { } cmd.Flags().BoolVar(&syncContext.DryRun, "dry-run", false, "Display information about the stream publishing, but do not publish the stream") cmd.Flags().BoolVar(&syncContext.KeepCache, "keep-cache", false, "Don't delete local files after publishing.") + cmd.Flags().BoolVar(&syncContext.ReflectStreams, "reflect-streams", true, "Require published streams to be reflected.") cmd.Flags().StringVar(&syncContext.TempDir, "temp-dir", getEnvDefault("TEMP_DIR", ""), "directory to use for temporary files") cmd.Flags().Float64Var(&syncContext.PublishBid, "publish-bid", 0.01, "Bid amount for the stream claim") cmd.Flags().StringVar(&syncContext.LbrynetAddr, "lbrynet-address", getEnvDefault("LBRYNET_ADDRESS", ""), "JSONRPC address of the local LBRYNet daemon") @@ -119,15 +121,19 @@ func localCmd(cmd *cobra.Command, args []string) { log.Debugf("Object to be published: %v", processedVideo) } else { - done, err := publisher.Publish(*processedVideo) + doneReflectingCh, err := publisher.Publish(*processedVideo, syncContext.ReflectStreams) if err != nil { log.Errorf("Error publishing video: %v", err) return } - err = <-done - if err != nil { - log.Errorf("Error while wating for stream to reflect: %v", err) + if syncContext.ReflectStreams { + err = <-doneReflectingCh + if err != nil { + log.Errorf("Error while wating for stream to reflect: %v", err) + } + } else { + log.Debugln("Not waiting for stream to reflect.") } } @@ -227,13 +233,14 @@ func getAbbrevDescription(v SourceVideo) string { return v.SourceURL } - maxLength := 2800 + additionalDescription := "\n...\n" + v.SourceURL + maxLength := 2800 - len(additionalDescription) + description := strings.TrimSpace(*v.Description) - additionalDescription := "\n" + v.SourceURL if len(description) > maxLength { description = description[:maxLength] } - return description + "\n..." + additionalDescription + return description + additionalDescription } type VideoSource interface { @@ -242,5 +249,5 @@ type VideoSource interface { } type VideoPublisher interface { - Publish(video PublishableVideo) (chan error, error) + Publish(video PublishableVideo, reflectStream bool) (chan error, error) } diff --git a/local/localSDKPublisher.go b/local/localSDKPublisher.go index 4e99355..3886eb9 100644 --- a/local/localSDKPublisher.go +++ b/local/localSDKPublisher.go @@ -48,7 +48,7 @@ func NewLocalSDKPublisher(sdkAddr, channelID string, publishBid float64) (*Local return &publisher, nil } -func (p *LocalSDKPublisher) Publish(video PublishableVideo) (chan error, error) { +func (p *LocalSDKPublisher) Publish(video PublishableVideo, reflectStream bool) (chan error, error) { streamCreateOptions := jsonrpc.StreamCreateOptions { ClaimCreateOptions: jsonrpc.ClaimCreateOptions { Title: &video.Title, @@ -67,6 +67,10 @@ func (p *LocalSDKPublisher) Publish(video PublishableVideo) (chan error, error) return nil, err } + if !reflectStream { + return nil, nil + } + done := make(chan error, 1) go func() { for { @@ -88,8 +92,9 @@ func (p *LocalSDKPublisher) Publish(video PublishableVideo) (chan error, error) break } if !fileStatus.UploadingToReflector { - log.Warn("Stream is not being uploaded to a reflector. Check your lbrynet settings if this is a mistake.") - break + log.Error("Stream is not being uploaded to a reflector. Check your lbrynet settings if this is a mistake.") + done <- errors.New("Stream is not being reflected (check lbrynet settings).") + return } log.Infof("Stream reflector progress: %d%%", fileStatus.ReflectorProgress) time.Sleep(5 * time.Second) diff --git a/local/readme.md b/local/readme.md index 16efe8b..81b0235 100644 --- a/local/readme.md +++ b/local/readme.md @@ -5,6 +5,7 @@ - LBRY SDK (what do we actually need this for?) - youtube-dl - enough space to cache stuff +- YouTube data API key ## Process @@ -15,6 +16,21 @@ - or easier, just error if no channel - enough lbc in wallet? +### Getting a YouTube API key + +To access the YouTube data API, you will first need some kind of google account. + +The API has two methods of authentication, OAuth2 and API keys. This application uses API keys. +These API keys are basically like passwords, and so once obtained, they should not be shared. + +The instructions for obtaining an API key are copied below from [here](https://developers.google.com/youtube/registering_an_application): + + +1. Open the [Credentials page](https://console.developers.google.com/apis/credentials) in the API Console. +2. Create an API key in the Console by clicking **Create credentials > API key**. You can restrict the key before using it in production by clicking **Restrict key** and selecting one of the **Restrictions**. + +To keep your API keys secure, follow the [best practices for securely using API keys](https://cloud.google.com/docs/authentication/api-keys). + ### Options to figure out what's already synced - simplest: assume nothing is synced yet @@ -50,4 +66,4 @@ ### Debugging -- dry-running the whole thing \ No newline at end of file +- dry-running the whole thing -- 2.45.3