From b9bf2f6e73b3ab8c4c6c5c8d9e4a2700cb7171c6 Mon Sep 17 00:00:00 2001 From: pseudoscalar <47653615+pseudoscalar@users.noreply.github.com> Date: Thu, 14 Oct 2021 13:03:57 -0500 Subject: [PATCH 01/10] Download video to sync to local cache --- local/local.go | 205 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 203 insertions(+), 2 deletions(-) diff --git a/local/local.go b/local/local.go index cacc57a..4bb23e3 100644 --- a/local/local.go +++ b/local/local.go @@ -1,22 +1,223 @@ package local import ( + "encoding/json" + "errors" "fmt" + "io/ioutil" + "time" + "os" + "os/exec" + "path" + "strings" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" + + "github.com/lbryio/ytsync/v5/downloader/ytdl" + + "github.com/lbryio/lbry.go/v2/extras/jsonrpc" ) +type SyncContext struct { + TempDir string + LbrynetAddr string +} + +func (c *SyncContext) Validate() error { + if c.TempDir == "" { + return errors.New("No TempDir provided") + } + if c.LbrynetAddr == "" { + return errors.New("No Lbrynet address provided") + } + return nil +} + +var syncContext SyncContext + func AddCommand(rootCmd *cobra.Command) { cmd := &cobra.Command{ Use: "local", Short: "run a personal ytsync", Run: localCmd, + Args: cobra.ExactArgs(1), } - //cmd.Flags().StringVar(&cache, "cache", "", "path to cache") + cmd.Flags().StringVar(&syncContext.TempDir, "temp-dir", getEnvDefault("TEMP_DIR", ""), "directory to use for temporary files") + cmd.Flags().StringVar(&syncContext.LbrynetAddr, "lbrynet-address", getEnvDefault("LBRYNET_ADDRESS", ""), "JSONRPC address of the local LBRYNet daemon") rootCmd.AddCommand(cmd) +} +func getEnvDefault(key, defaultValue string) string { + if value, ok := os.LookupEnv(key); ok { + return value + } + return defaultValue } func localCmd(cmd *cobra.Command, args []string) { - fmt.Println("local") + err := syncContext.Validate() + if err != nil { + log.Error(err) + return + } + fmt.Println(syncContext.LbrynetAddr) + + videoID := args[0] + fmt.Println(videoID) + + lbrynet := jsonrpc.NewClient(syncContext.LbrynetAddr) + lbrynet.SetRPCTimeout(5 * time.Minute) + + status, err := lbrynet.Status() + if err != nil { + log.Error(err) + return + } + + fmt.Println(status.IsRunning) + fmt.Println(status.Wallet.Connected) + + videoBasePath := path.Join(syncContext.TempDir, videoID) + + _, videoMetadataPath, err := getVideoMetadata(videoBasePath, videoID) + if err != nil { + log.Errorf("Error getting video metadata: %v", err) + return + } + + err = downloadVideo(videoBasePath, videoMetadataPath) + if err != nil { + log.Errorf("Error downloading video: %v", err) + } + + log.Info("Done") +} + +func getVideoMetadata(basePath, videoID string) (*ytdl.YtdlVideo, string, error) { + metadataPath := basePath + ".info.json" + + _, err := os.Stat(metadataPath) + if err != nil && !os.IsNotExist(err) { + log.Errorf("Error determining if video metadata already exists: %v", err) + return nil, "", err + } else if err == nil { + log.Debugf("Video metadata file %s already exists. Attempting to load existing file.", metadataPath) + videoMetadata, err := loadVideoMetadata(metadataPath) + if err != nil { + log.Debugf("Error loading pre-existing video metadata: %v. Deleting file and attempting re-download.", err) + } else { + return videoMetadata, metadataPath, nil + } + } + + if err := downloadVideoMetadata(basePath, videoID); err != nil { + return nil, "", err + } + + videoMetadata, err := loadVideoMetadata(metadataPath) + return videoMetadata, metadataPath, err +} + +func loadVideoMetadata(path string) (*ytdl.YtdlVideo, error) { + f, err := os.Open(path) + if err != nil { + return nil, err + } + defer f.Close() + + metadataBytes, err := ioutil.ReadAll(f) + if err != nil { + return nil, err + } + + var videoMetadata *ytdl.YtdlVideo + err = json.Unmarshal(metadataBytes, &videoMetadata) + if err != nil { + return nil, err + } + + return videoMetadata, nil +} + +func downloadVideoMetadata(basePath, videoID string) error { + ytdlArgs := []string{ + "--skip-download", + "--write-info-json", + "--force-overwrites", + fmt.Sprintf("https://www.youtube.com/watch?v=%s", videoID), + "--cookies", + "cookies.txt", + "-o", + basePath, + } + ytdlCmd := exec.Command("yt-dlp", ytdlArgs...) + output, err := runCmd(ytdlCmd) + log.Debug(output) + return err +} + +func downloadVideo(basePath, metadataPath string) error { + ytdlArgs := []string{ + "--no-progress", + "-o", + basePath, + "--merge-output-format", + "mp4", + "--postprocessor-args", + "ffmpeg:-movflags faststart", + "--abort-on-unavailable-fragment", + "--fragment-retries", + "1", + "--cookies", + "cookies.txt", + "--extractor-args", + "youtube:player_client=android", + "--load-info-json", + metadataPath, + "-fbestvideo[ext=mp4][vcodec!*=av01][height<=720]+bestaudio[ext!=webm][format_id!=258][format_id!=251][format_id!=256][format_id!=327]", + } + + ytdlCmd := exec.Command("yt-dlp", ytdlArgs...) + output, err := runCmd(ytdlCmd) + log.Debug(output) + return err +} + +func runCmd(cmd *exec.Cmd) ([]string, error) { + log.Infof("running cmd: %s", strings.Join(cmd.Args, " ")) + var err error + stderr, err := cmd.StderrPipe() + if err != nil { + return nil, err + } + stdout, err := cmd.StdoutPipe() + if err != nil { + return nil, err + } + err = cmd.Start() + if err != nil { + return nil, err + } + outLog, err := ioutil.ReadAll(stdout) + if err != nil { + return nil, err + } + errorLog, err := ioutil.ReadAll(stderr) + if err != nil { + return nil, err + } + done := make(chan error, 1) + go func() { + done <- cmd.Wait() + }() + + select { + case err := <-done: + if err != nil { + log.Error(string(errorLog)) + return nil, err + } + return strings.Split(strings.Replace(string(outLog), "\r\n", "\n", -1), "\n"), nil + } } -- 2.45.2 From 8ea15afce855a3bd9819ce68f5dcfc91939465d0 Mon Sep 17 00:00:00 2001 From: pseudoscalar <47653615+pseudoscalar@users.noreply.github.com> Date: Mon, 25 Oct 2021 16:20:12 -0500 Subject: [PATCH 02/10] Basic stream publishing. Still needs some work. --- local/local.go | 166 ++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 156 insertions(+), 10 deletions(-) diff --git a/local/local.go b/local/local.go index 4bb23e3..284d7ef 100644 --- a/local/local.go +++ b/local/local.go @@ -9,19 +9,27 @@ import ( "os" "os/exec" "path" + "regexp" + "sort" "strings" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "github.com/abadojack/whatlanggo" "github.com/lbryio/ytsync/v5/downloader/ytdl" + "github.com/lbryio/ytsync/v5/namer" + "github.com/lbryio/ytsync/v5/tags_manager" "github.com/lbryio/lbry.go/v2/extras/jsonrpc" + "github.com/lbryio/lbry.go/v2/extras/util" ) type SyncContext struct { TempDir string LbrynetAddr string + ChannelID string + PublishBid float64 } func (c *SyncContext) Validate() error { @@ -31,6 +39,12 @@ func (c *SyncContext) Validate() error { if c.LbrynetAddr == "" { return errors.New("No Lbrynet address provided") } + if c.ChannelID == "" { + return errors.New("No channel ID provided") + } + if c.PublishBid <= 0.0 { + return errors.New("Publish bid is not greater than zero") + } return nil } @@ -44,7 +58,9 @@ func AddCommand(rootCmd *cobra.Command) { Args: cobra.ExactArgs(1), } cmd.Flags().StringVar(&syncContext.TempDir, "temp-dir", getEnvDefault("TEMP_DIR", ""), "directory to use for temporary files") + cmd.Flags().Float64Var(&syncContext.PublishBid, "publish-bid", 0.01, "Bid amount for the stream claim") cmd.Flags().StringVar(&syncContext.LbrynetAddr, "lbrynet-address", getEnvDefault("LBRYNET_ADDRESS", ""), "JSONRPC address of the local LBRYNet daemon") + cmd.Flags().StringVar(&syncContext.ChannelID, "channel-id", "", "LBRY channel ID to publish to") rootCmd.AddCommand(cmd) } @@ -75,12 +91,25 @@ func localCmd(cmd *cobra.Command, args []string) { return } - fmt.Println(status.IsRunning) - fmt.Println(status.Wallet.Connected) + if !status.IsRunning { + log.Error("SDK is not running") + return + } + + // Should check to see if the SDK owns the channel + + // Should check to see if wallet is unlocked + // but jsonrpc.Client doesn't have WalletStatus method + // so skip for now + + // Should check to see if streams are configured to be reflected and warn if not + // but jsonrpc.Client doesn't have SettingsGet method to see if streams are reflected + // so use File.UploadingToReflector as a proxy for now + videoBasePath := path.Join(syncContext.TempDir, videoID) - _, videoMetadataPath, err := getVideoMetadata(videoBasePath, videoID) + videoMetadata, videoMetadataPath, err := getVideoMetadata(videoBasePath, videoID) if err != nil { log.Errorf("Error getting video metadata: %v", err) return @@ -89,6 +118,82 @@ func localCmd(cmd *cobra.Command, args []string) { err = downloadVideo(videoBasePath, videoMetadataPath) if err != nil { log.Errorf("Error downloading video: %v", err) + return + } + + tags, err := tags_manager.SanitizeTags(videoMetadata.Tags, syncContext.ChannelID) + if err != nil { + log.Errorf("Error sanitizing tags: %v", err) + return + } + + urlsRegex := regexp.MustCompile(`(?m) ?(f|ht)(tp)(s?)(://)(.*)[.|/](.*)`) + descriptionSample := urlsRegex.ReplaceAllString(videoMetadata.Description, "") + info := whatlanggo.Detect(descriptionSample) + info2 := whatlanggo.Detect(videoMetadata.Title) + var languages []string = nil + if info.IsReliable() && info.Lang.Iso6391() != "" { + language := info.Lang.Iso6391() + languages = []string{language} + } else if info2.IsReliable() && info2.Lang.Iso6391() != "" { + language := info2.Lang.Iso6391() + languages = []string{language} + } + // Thumbnail and ReleaseTime need to be properly determined + streamCreateOptions := jsonrpc.StreamCreateOptions { + ClaimCreateOptions: jsonrpc.ClaimCreateOptions { + Title: &videoMetadata.Title, + Description: util.PtrToString(getAbbrevDescription(videoMetadata)), + Languages: languages, + //ThumbnailURL: &v.thumbnailURL, + Tags: tags, + }, + ReleaseTime: util.PtrToInt64(time.Now().Unix()), + ChannelID: &syncContext.ChannelID, + License: util.PtrToString("Copyrighted (contact publisher)"), + } + + videoPath, err := getVideoDownloadedPath(syncContext.TempDir, videoID) + if err != nil { + log.Errorf("Error determining downloaded video path: %v", err) + } + + fmt.Println("%s", *streamCreateOptions.ClaimCreateOptions.Title) + fmt.Println("%s", *streamCreateOptions.ClaimCreateOptions.Description) + fmt.Println("%v", streamCreateOptions.ClaimCreateOptions.Languages) + fmt.Println("%v", streamCreateOptions.ClaimCreateOptions.Tags) + + claimName := namer.NewNamer().GetNextName(videoMetadata.Title) + log.Infof("Publishing stream as %s", claimName) + + txSummary, err := lbrynet.StreamCreate(claimName, videoPath, syncContext.PublishBid, streamCreateOptions) + if err != nil { + log.Errorf("Error creating stream: %v", err) + return + } + + for { + fileListResponse, fileIndex, err := findFileByTxid(lbrynet, txSummary.Txid) + if err != nil { + log.Errorf("Error finding file by txid: %v", err) + return + } + if fileListResponse == nil { + log.Errorf("Could not find file in list with correct txid") + return + } + + fileStatus := fileListResponse.Items[fileIndex] + if fileStatus.IsFullyReflected { + log.Info("Stream is fully reflected") + break + } + if !fileStatus.UploadingToReflector { + log.Warn("Stream is not being uploaded to a reflector. Check your lbrynet settings if this is a mistake.") + break + } + log.Infof("Stream reflector progress: %d%%", fileStatus.ReflectorProgress) + time.Sleep(5 * time.Second) } log.Info("Done") @@ -120,13 +225,7 @@ func getVideoMetadata(basePath, videoID string) (*ytdl.YtdlVideo, string, error) } func loadVideoMetadata(path string) (*ytdl.YtdlVideo, error) { - f, err := os.Open(path) - if err != nil { - return nil, err - } - defer f.Close() - - metadataBytes, err := ioutil.ReadAll(f) + metadataBytes, err := os.ReadFile(path) if err != nil { return nil, err } @@ -221,3 +320,50 @@ func runCmd(cmd *exec.Cmd) ([]string, error) { return strings.Split(strings.Replace(string(outLog), "\r\n", "\n", -1), "\n"), nil } } + +func getVideoDownloadedPath(videoDir, videoID string) (string, error) { + files, err := ioutil.ReadDir(videoDir) + if err != nil { + return "", err + } + + for _, f := range files { + if f.IsDir() { + continue + } + if path.Ext(f.Name()) == ".mp4" && strings.Contains(f.Name(), videoID) { + return path.Join(videoDir, f.Name()), nil + } + } + return "", errors.New("could not find any downloaded videos") + +} + +func getAbbrevDescription(v *ytdl.YtdlVideo) string { + maxLength := 2800 + description := strings.TrimSpace(v.Description) + additionalDescription := "\nhttps://www.youtube.com/watch?v=" + v.ID + if len(description) > maxLength { + description = description[:maxLength] + } + return description + "\n..." + additionalDescription +} + +// if jsonrpc.Client.FileList is extended to match the actual jsonrpc schema, this can be removed +func findFileByTxid(client *jsonrpc.Client, txid string) (*jsonrpc.FileListResponse, int, error) { + response, err := client.FileList(0, 20) + for { + if err != nil { + log.Errorf("Error getting file list page: %v", err) + return nil, 0, err + } + index := sort.Search(len(response.Items), func (i int) bool { return response.Items[i].Txid == txid }) + if index < len(response.Items) { + return response, index, nil + } + if response.Page >= response.TotalPages { + return nil, 0, nil + } + response, err = client.FileList(response.Page + 1, 20) + } +} -- 2.45.2 From 2ba960ae01e6ba080c33c20b8bded43c3b515514 Mon Sep 17 00:00:00 2001 From: pseudoscalar <47653615+pseudoscalar@users.noreply.github.com> Date: Tue, 2 Nov 2021 15:46:03 -0500 Subject: [PATCH 03/10] Refactor to support future direction of development --- local/local.go | 306 +++++++++++++------------------------ local/localSDKPublisher.go | 120 +++++++++++++++ local/ytdl.go | 202 ++++++++++++++++++++++++ local/ytdlVideoSource.go | 54 +++++++ 4 files changed, 481 insertions(+), 201 deletions(-) create mode 100644 local/localSDKPublisher.go create mode 100644 local/ytdl.go create mode 100644 local/ytdlVideoSource.go diff --git a/local/local.go b/local/local.go index 284d7ef..1efd1fa 100644 --- a/local/local.go +++ b/local/local.go @@ -5,12 +5,9 @@ import ( "errors" "fmt" "io/ioutil" - "time" "os" - "os/exec" "path" "regexp" - "sort" "strings" log "github.com/sirupsen/logrus" @@ -20,9 +17,6 @@ import ( "github.com/lbryio/ytsync/v5/downloader/ytdl" "github.com/lbryio/ytsync/v5/namer" "github.com/lbryio/ytsync/v5/tags_manager" - - "github.com/lbryio/lbry.go/v2/extras/jsonrpc" - "github.com/lbryio/lbry.go/v2/extras/util" ) type SyncContext struct { @@ -80,122 +74,45 @@ func localCmd(cmd *cobra.Command, args []string) { fmt.Println(syncContext.LbrynetAddr) videoID := args[0] - fmt.Println(videoID) - lbrynet := jsonrpc.NewClient(syncContext.LbrynetAddr) - lbrynet.SetRPCTimeout(5 * time.Minute) + log.Debugf("Running sync for YouTube video ID %s", videoID) - status, err := lbrynet.Status() + var publisher VideoPublisher + publisher, err = NewLocalSDKPublisher(syncContext.LbrynetAddr, syncContext.ChannelID, syncContext.PublishBid) if err != nil { - log.Error(err) + log.Errorf("Error setting up publisher: %v", err) return } - if !status.IsRunning { - log.Error("SDK is not running") + var videoSource VideoSource + videoSource, err = NewYtdlVideoSource(syncContext.TempDir) + if err != nil { + log.Errorf("Error setting up video source: %v", err) return } - // Should check to see if the SDK owns the channel - - // Should check to see if wallet is unlocked - // but jsonrpc.Client doesn't have WalletStatus method - // so skip for now - - // Should check to see if streams are configured to be reflected and warn if not - // but jsonrpc.Client doesn't have SettingsGet method to see if streams are reflected - // so use File.UploadingToReflector as a proxy for now - - - videoBasePath := path.Join(syncContext.TempDir, videoID) - - videoMetadata, videoMetadataPath, err := getVideoMetadata(videoBasePath, videoID) + sourceVideo, err := videoSource.GetVideo(videoID) if err != nil { - log.Errorf("Error getting video metadata: %v", err) + log.Errorf("Error getting source video: %v", err) return } - err = downloadVideo(videoBasePath, videoMetadataPath) + processedVideo, err := processVideoForPublishing(*sourceVideo, syncContext.ChannelID) if err != nil { - log.Errorf("Error downloading video: %v", err) + log.Errorf("Error processing source video for publishing: %v", err) return } - tags, err := tags_manager.SanitizeTags(videoMetadata.Tags, syncContext.ChannelID) + done, err := publisher.Publish(*processedVideo) if err != nil { - log.Errorf("Error sanitizing tags: %v", err) + log.Errorf("Error publishing video: %v", err) return } - urlsRegex := regexp.MustCompile(`(?m) ?(f|ht)(tp)(s?)(://)(.*)[.|/](.*)`) - descriptionSample := urlsRegex.ReplaceAllString(videoMetadata.Description, "") - info := whatlanggo.Detect(descriptionSample) - info2 := whatlanggo.Detect(videoMetadata.Title) - var languages []string = nil - if info.IsReliable() && info.Lang.Iso6391() != "" { - language := info.Lang.Iso6391() - languages = []string{language} - } else if info2.IsReliable() && info2.Lang.Iso6391() != "" { - language := info2.Lang.Iso6391() - languages = []string{language} - } - // Thumbnail and ReleaseTime need to be properly determined - streamCreateOptions := jsonrpc.StreamCreateOptions { - ClaimCreateOptions: jsonrpc.ClaimCreateOptions { - Title: &videoMetadata.Title, - Description: util.PtrToString(getAbbrevDescription(videoMetadata)), - Languages: languages, - //ThumbnailURL: &v.thumbnailURL, - Tags: tags, - }, - ReleaseTime: util.PtrToInt64(time.Now().Unix()), - ChannelID: &syncContext.ChannelID, - License: util.PtrToString("Copyrighted (contact publisher)"), - } - - videoPath, err := getVideoDownloadedPath(syncContext.TempDir, videoID) + err = <-done if err != nil { - log.Errorf("Error determining downloaded video path: %v", err) + log.Errorf("Error while wating for stream to reflect: %v", err) } - - fmt.Println("%s", *streamCreateOptions.ClaimCreateOptions.Title) - fmt.Println("%s", *streamCreateOptions.ClaimCreateOptions.Description) - fmt.Println("%v", streamCreateOptions.ClaimCreateOptions.Languages) - fmt.Println("%v", streamCreateOptions.ClaimCreateOptions.Tags) - - claimName := namer.NewNamer().GetNextName(videoMetadata.Title) - log.Infof("Publishing stream as %s", claimName) - - txSummary, err := lbrynet.StreamCreate(claimName, videoPath, syncContext.PublishBid, streamCreateOptions) - if err != nil { - log.Errorf("Error creating stream: %v", err) - return - } - - for { - fileListResponse, fileIndex, err := findFileByTxid(lbrynet, txSummary.Txid) - if err != nil { - log.Errorf("Error finding file by txid: %v", err) - return - } - if fileListResponse == nil { - log.Errorf("Could not find file in list with correct txid") - return - } - - fileStatus := fileListResponse.Items[fileIndex] - if fileStatus.IsFullyReflected { - log.Info("Stream is fully reflected") - break - } - if !fileStatus.UploadingToReflector { - log.Warn("Stream is not being uploaded to a reflector. Check your lbrynet settings if this is a mistake.") - break - } - log.Infof("Stream reflector progress: %d%%", fileStatus.ReflectorProgress) - time.Sleep(5 * time.Second) - } - log.Info("Done") } @@ -239,88 +156,6 @@ func loadVideoMetadata(path string) (*ytdl.YtdlVideo, error) { return videoMetadata, nil } -func downloadVideoMetadata(basePath, videoID string) error { - ytdlArgs := []string{ - "--skip-download", - "--write-info-json", - "--force-overwrites", - fmt.Sprintf("https://www.youtube.com/watch?v=%s", videoID), - "--cookies", - "cookies.txt", - "-o", - basePath, - } - ytdlCmd := exec.Command("yt-dlp", ytdlArgs...) - output, err := runCmd(ytdlCmd) - log.Debug(output) - return err -} - -func downloadVideo(basePath, metadataPath string) error { - ytdlArgs := []string{ - "--no-progress", - "-o", - basePath, - "--merge-output-format", - "mp4", - "--postprocessor-args", - "ffmpeg:-movflags faststart", - "--abort-on-unavailable-fragment", - "--fragment-retries", - "1", - "--cookies", - "cookies.txt", - "--extractor-args", - "youtube:player_client=android", - "--load-info-json", - metadataPath, - "-fbestvideo[ext=mp4][vcodec!*=av01][height<=720]+bestaudio[ext!=webm][format_id!=258][format_id!=251][format_id!=256][format_id!=327]", - } - - ytdlCmd := exec.Command("yt-dlp", ytdlArgs...) - output, err := runCmd(ytdlCmd) - log.Debug(output) - return err -} - -func runCmd(cmd *exec.Cmd) ([]string, error) { - log.Infof("running cmd: %s", strings.Join(cmd.Args, " ")) - var err error - stderr, err := cmd.StderrPipe() - if err != nil { - return nil, err - } - stdout, err := cmd.StdoutPipe() - if err != nil { - return nil, err - } - err = cmd.Start() - if err != nil { - return nil, err - } - outLog, err := ioutil.ReadAll(stdout) - if err != nil { - return nil, err - } - errorLog, err := ioutil.ReadAll(stderr) - if err != nil { - return nil, err - } - done := make(chan error, 1) - go func() { - done <- cmd.Wait() - }() - - select { - case err := <-done: - if err != nil { - log.Error(string(errorLog)) - return nil, err - } - return strings.Split(strings.Replace(string(outLog), "\r\n", "\n", -1), "\n"), nil - } -} - func getVideoDownloadedPath(videoDir, videoID string) (string, error) { files, err := ioutil.ReadDir(videoDir) if err != nil { @@ -339,31 +174,100 @@ func getVideoDownloadedPath(videoDir, videoID string) (string, error) { } -func getAbbrevDescription(v *ytdl.YtdlVideo) string { +func getAbbrevDescription(v SourceVideo) string { + if v.Description == nil { + return v.SourceURL + } + maxLength := 2800 - description := strings.TrimSpace(v.Description) - additionalDescription := "\nhttps://www.youtube.com/watch?v=" + v.ID + description := strings.TrimSpace(*v.Description) + additionalDescription := "\n" + v.SourceURL if len(description) > maxLength { description = description[:maxLength] } return description + "\n..." + additionalDescription } -// if jsonrpc.Client.FileList is extended to match the actual jsonrpc schema, this can be removed -func findFileByTxid(client *jsonrpc.Client, txid string) (*jsonrpc.FileListResponse, int, error) { - response, err := client.FileList(0, 20) - for { - if err != nil { - log.Errorf("Error getting file list page: %v", err) - return nil, 0, err - } - index := sort.Search(len(response.Items), func (i int) bool { return response.Items[i].Txid == txid }) - if index < len(response.Items) { - return response, index, nil - } - if response.Page >= response.TotalPages { - return nil, 0, nil - } - response, err = client.FileList(response.Page + 1, 20) - } +type SourceVideo struct { + ID string + Title *string + Description *string + SourceURL string + Languages []string + Tags []string + ReleaseTime *int64 + ThumbnailURL *string + FullLocalPath string +} + +type PublishableVideo struct { + ID string + ClaimName string + Title string + Description string + SourceURL string + Languages []string + Tags []string + ReleaseTime int64 + ThumbnailURL string + FullLocalPath string +} + +func processVideoForPublishing(source SourceVideo, channelID string) (*PublishableVideo, error) { + tags, err := tags_manager.SanitizeTags(source.Tags, channelID) + if err != nil { + log.Errorf("Error sanitizing tags: %v", err) + return nil, err + } + + descriptionSample := "" + if source.Description != nil { + urlsRegex := regexp.MustCompile(`(?m) ?(f|ht)(tp)(s?)(://)(.*)[.|/](.*)`) + descriptionSample = urlsRegex.ReplaceAllString(*source.Description, "") + } + info := whatlanggo.Detect(descriptionSample) + + title := "" + if source.Title != nil { + title = *source.Title + } + info2 := whatlanggo.Detect(title) + var languages []string = nil + if info.IsReliable() && info.Lang.Iso6391() != "" { + language := info.Lang.Iso6391() + languages = []string{language} + } else if info2.IsReliable() && info2.Lang.Iso6391() != "" { + language := info2.Lang.Iso6391() + languages = []string{language} + } + + claimName := namer.NewNamer().GetNextName(title) + + thumbnailURL := "" + if source.ThumbnailURL != nil { + thumbnailURL = *source.ThumbnailURL + } + + processed := PublishableVideo { + ClaimName: claimName, + Title: title, + Description: getAbbrevDescription(source), + Languages: languages, + Tags: tags, + ReleaseTime: *source.ReleaseTime, + ThumbnailURL: thumbnailURL, + FullLocalPath: source.FullLocalPath, + } + + log.Debugf("Video prepared for publication: %v", processed) + + return &processed, nil +} + +type VideoSource interface { + GetVideo(id string) (*SourceVideo, error) +} + +type VideoPublisher interface { + Publish(video PublishableVideo) (chan error, error) } diff --git a/local/localSDKPublisher.go b/local/localSDKPublisher.go new file mode 100644 index 0000000..4e99355 --- /dev/null +++ b/local/localSDKPublisher.go @@ -0,0 +1,120 @@ +package local + +import ( + "errors" + "sort" + "time" + + log "github.com/sirupsen/logrus" + + "github.com/lbryio/lbry.go/v2/extras/jsonrpc" + "github.com/lbryio/lbry.go/v2/extras/util" +) + +type LocalSDKPublisher struct { + channelID string + publishBid float64 + lbrynet *jsonrpc.Client +} + +func NewLocalSDKPublisher(sdkAddr, channelID string, publishBid float64) (*LocalSDKPublisher, error) { + lbrynet := jsonrpc.NewClient(sdkAddr) + lbrynet.SetRPCTimeout(5 * time.Minute) + + status, err := lbrynet.Status() + if err != nil { + return nil, err + } + + if !status.IsRunning { + return nil, errors.New("SDK is not running") + } + + // Should check to see if the SDK owns the channel + + // Should check to see if wallet is unlocked + // but jsonrpc.Client doesn't have WalletStatus method + // so skip for now + + // Should check to see if streams are configured to be reflected and warn if not + // but jsonrpc.Client doesn't have SettingsGet method to see if streams are reflected + // so use File.UploadingToReflector as a proxy for now + + publisher := LocalSDKPublisher { + channelID: channelID, + publishBid: publishBid, + lbrynet: lbrynet, + } + return &publisher, nil +} + +func (p *LocalSDKPublisher) Publish(video PublishableVideo) (chan error, error) { + streamCreateOptions := jsonrpc.StreamCreateOptions { + ClaimCreateOptions: jsonrpc.ClaimCreateOptions { + Title: &video.Title, + Description: &video.Description, + Languages: video.Languages, + ThumbnailURL: &video.ThumbnailURL, + Tags: video.Tags, + }, + ReleaseTime: &video.ReleaseTime, + ChannelID: &p.channelID, + License: util.PtrToString("Copyrighted (contact publisher)"), + } + + txSummary, err := p.lbrynet.StreamCreate(video.ClaimName, video.FullLocalPath, p.publishBid, streamCreateOptions) + if err != nil { + return nil, err + } + + done := make(chan error, 1) + go func() { + for { + fileListResponse, fileIndex, err := findFileByTxid(p.lbrynet, txSummary.Txid) + if err != nil { + log.Errorf("Error finding file by txid: %v", err) + done <- err + return + } + if fileListResponse == nil { + log.Errorf("Could not find file in list with correct txid") + done <- err + return + } + + fileStatus := fileListResponse.Items[fileIndex] + if fileStatus.IsFullyReflected { + log.Info("Stream is fully reflected") + break + } + if !fileStatus.UploadingToReflector { + log.Warn("Stream is not being uploaded to a reflector. Check your lbrynet settings if this is a mistake.") + break + } + log.Infof("Stream reflector progress: %d%%", fileStatus.ReflectorProgress) + time.Sleep(5 * time.Second) + } + done <- nil + }() + + return done, nil +} + +// if jsonrpc.Client.FileList is extended to match the actual jsonrpc schema, this can be removed +func findFileByTxid(client *jsonrpc.Client, txid string) (*jsonrpc.FileListResponse, int, error) { + response, err := client.FileList(0, 20) + for { + if err != nil { + log.Errorf("Error getting file list page: %v", err) + return nil, 0, err + } + index := sort.Search(len(response.Items), func (i int) bool { return response.Items[i].Txid == txid }) + if index < len(response.Items) { + return response, index, nil + } + if response.Page >= response.TotalPages { + return nil, 0, nil + } + response, err = client.FileList(response.Page + 1, 20) + } +} diff --git a/local/ytdl.go b/local/ytdl.go new file mode 100644 index 0000000..712aa44 --- /dev/null +++ b/local/ytdl.go @@ -0,0 +1,202 @@ +package local + +import ( + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "os" + "os/exec" + "path" + "strings" + + log "github.com/sirupsen/logrus" + + "github.com/lbryio/ytsync/v5/downloader/ytdl" +) + +type Ytdl struct { + DownloadDir string +} + +func NewYtdl(downloadDir string) (*Ytdl, error) { + // TODO validate download dir + + y := Ytdl { + DownloadDir: downloadDir, + } + + return &y, nil +} + +func (y *Ytdl) GetVideoMetadata(videoID string) (*ytdl.YtdlVideo, error) { + metadataPath, err := y.GetVideoMetadataFile(videoID) + if err != nil { + return nil, err + } + + metadataBytes, err := os.ReadFile(metadataPath) + if err != nil { + return nil, err + } + + var metadata *ytdl.YtdlVideo + err = json.Unmarshal(metadataBytes, &metadata) + if err != nil { + return nil, err + } + + return metadata, nil +} + +func (y *Ytdl) GetVideoMetadataFile(videoID string) (string, error) { + basePath := path.Join(y.DownloadDir, videoID) + metadataPath := basePath + ".info.json" + + _, err := os.Stat(metadataPath) + if err != nil && !os.IsNotExist(err) { + log.Errorf("Error determining if video metadata already exists: %v", err) + return "", err + } else if err != nil { + log.Debugf("Metadata file for video %s does not exist. Downloading now.", videoID) + err = downloadVideoMetadata(basePath, videoID) + if err != nil { + return "", err + } + } + + return metadataPath, nil +} + +func (y *Ytdl) GetVideoFile(videoID string) (string, error) { + videoPath, err := findDownloadedVideo(y.DownloadDir, videoID) + if err != nil { + return "", err + } + + if videoPath != nil { + return *videoPath, nil + } + + basePath := path.Join(y.DownloadDir, videoID) + metadataPath, err := y.GetVideoMetadataFile(videoID) + if err != nil { + log.Errorf("Error getting metadata path in preparation for video download: %v", err) + return "", err + } + err = downloadVideo(basePath, metadataPath) + if err != nil { + return "", nil + } + + videoPath, err = findDownloadedVideo(y.DownloadDir, videoID) + if err != nil { + log.Errorf("Error from findDownloadedVideo() after already succeeding once: %v", err) + return "", err + } + if videoPath == nil { + return "", errors.New("Could not find a downloaded video after successful download.") + } + + return *videoPath, nil +} + +func findDownloadedVideo(videoDir, videoID string) (*string, error) { + files, err := ioutil.ReadDir(videoDir) + if err != nil { + return nil, err + } + + for _, f := range files { + if f.IsDir() { + continue + } + if path.Ext(f.Name()) == ".mp4" && strings.Contains(f.Name(), videoID) { + videoPath := path.Join(videoDir, f.Name()) + return &videoPath, nil + } + } + return nil, nil +} + +func downloadVideoMetadata(basePath, videoID string) error { + ytdlArgs := []string{ + "--skip-download", + "--write-info-json", + "--force-overwrites", + fmt.Sprintf("https://www.youtube.com/watch?v=%s", videoID), + "--cookies", + "cookies.txt", + "-o", + basePath, + } + ytdlCmd := exec.Command("yt-dlp", ytdlArgs...) + output, err := runCmd(ytdlCmd) + log.Debug(output) + return err +} + +func downloadVideo(basePath, metadataPath string) error { + ytdlArgs := []string{ + "--no-progress", + "-o", + basePath, + "--merge-output-format", + "mp4", + "--postprocessor-args", + "ffmpeg:-movflags faststart", + "--abort-on-unavailable-fragment", + "--fragment-retries", + "1", + "--cookies", + "cookies.txt", + "--extractor-args", + "youtube:player_client=android", + "--load-info-json", + metadataPath, + "-fbestvideo[ext=mp4][vcodec!*=av01][height<=720]+bestaudio[ext!=webm][format_id!=258][format_id!=251][format_id!=256][format_id!=327]", + } + + ytdlCmd := exec.Command("yt-dlp", ytdlArgs...) + output, err := runCmd(ytdlCmd) + log.Debug(output) + return err +} + +func runCmd(cmd *exec.Cmd) ([]string, error) { + log.Infof("running cmd: %s", strings.Join(cmd.Args, " ")) + var err error + stderr, err := cmd.StderrPipe() + if err != nil { + return nil, err + } + stdout, err := cmd.StdoutPipe() + if err != nil { + return nil, err + } + err = cmd.Start() + if err != nil { + return nil, err + } + outLog, err := ioutil.ReadAll(stdout) + if err != nil { + return nil, err + } + errorLog, err := ioutil.ReadAll(stderr) + if err != nil { + return nil, err + } + done := make(chan error, 1) + go func() { + done <- cmd.Wait() + }() + + select { + case err := <-done: + if err != nil { + log.Error(string(errorLog)) + return nil, err + } + return strings.Split(strings.Replace(string(outLog), "\r\n", "\n", -1), "\n"), nil + } +} diff --git a/local/ytdlVideoSource.go b/local/ytdlVideoSource.go new file mode 100644 index 0000000..bc26884 --- /dev/null +++ b/local/ytdlVideoSource.go @@ -0,0 +1,54 @@ +package local + +import ( + "time" + + log "github.com/sirupsen/logrus" + + "github.com/lbryio/lbry.go/v2/extras/util" +) + +type YtdlVideoSource struct { + downloader Ytdl +} + +func NewYtdlVideoSource(downloadDir string) (*YtdlVideoSource, error) { + ytdl, err := NewYtdl(downloadDir) + if err != nil { + return nil, err + } + + source := YtdlVideoSource { + downloader: *ytdl, + } + + return &source, nil +} + +func (s *YtdlVideoSource) GetVideo(id string) (*SourceVideo, error) { + metadata, err := s.downloader.GetVideoMetadata(id) + if err != nil { + return nil, err + } + + videoPath, err := s.downloader.GetVideoFile(id) + if err != nil { + return nil, err + } + + sourceVideo := SourceVideo { + ID: id, + Title: &metadata.Title, + Description: &metadata.Description, + SourceURL: "\nhttps://www.youtube.com/watch?v=" + id, + Languages: []string{}, + Tags: metadata.Tags, + ReleaseTime: util.PtrToInt64(time.Now().Unix()), + ThumbnailURL: nil, + FullLocalPath: videoPath, + } + + log.Debugf("Source video retrieved via ytdl: %v", sourceVideo) + + return &sourceVideo, nil +} -- 2.45.2 From eb30fa4299ee9d2ecb8495312f002eafc61af15f Mon Sep 17 00:00:00 2001 From: pseudoscalar <47653615+pseudoscalar@users.noreply.github.com> Date: Thu, 4 Nov 2021 15:28:28 -0500 Subject: [PATCH 04/10] Determine release time via YouTube API --- local/local.go | 131 +++++++++++++-------------------------- local/youtubeEnricher.go | 45 ++++++++++++++ local/ytapi.go | 83 +++++++++++++++++++++++++ local/ytdlVideoSource.go | 30 +++++++-- 4 files changed, 194 insertions(+), 95 deletions(-) create mode 100644 local/youtubeEnricher.go create mode 100644 local/ytapi.go diff --git a/local/local.go b/local/local.go index 1efd1fa..61f25f3 100644 --- a/local/local.go +++ b/local/local.go @@ -1,20 +1,17 @@ package local import ( - "encoding/json" "errors" - "fmt" - "io/ioutil" "os" - "path" "regexp" "strings" + "time" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/abadojack/whatlanggo" - "github.com/lbryio/ytsync/v5/downloader/ytdl" + "github.com/lbryio/lbry.go/v2/extras/util" "github.com/lbryio/ytsync/v5/namer" "github.com/lbryio/ytsync/v5/tags_manager" ) @@ -24,6 +21,7 @@ type SyncContext struct { LbrynetAddr string ChannelID string PublishBid float64 + YouTubeSourceConfig *YouTubeSourceConfig } func (c *SyncContext) Validate() error { @@ -42,6 +40,10 @@ func (c *SyncContext) Validate() error { return nil } +type YouTubeSourceConfig struct { + YouTubeAPIKey string +} + var syncContext SyncContext func AddCommand(rootCmd *cobra.Command) { @@ -55,6 +57,10 @@ func AddCommand(rootCmd *cobra.Command) { cmd.Flags().Float64Var(&syncContext.PublishBid, "publish-bid", 0.01, "Bid amount for the stream claim") cmd.Flags().StringVar(&syncContext.LbrynetAddr, "lbrynet-address", getEnvDefault("LBRYNET_ADDRESS", ""), "JSONRPC address of the local LBRYNet daemon") cmd.Flags().StringVar(&syncContext.ChannelID, "channel-id", "", "LBRY channel ID to publish to") + + // For now, assume source is always YouTube + syncContext.YouTubeSourceConfig = &YouTubeSourceConfig{} + cmd.Flags().StringVar(&syncContext.YouTubeSourceConfig.YouTubeAPIKey, "youtube-api-key", getEnvDefault("YOUTUBE_API_KEY", ""), "YouTube API Key") rootCmd.AddCommand(cmd) } @@ -71,11 +77,9 @@ func localCmd(cmd *cobra.Command, args []string) { log.Error(err) return } - fmt.Println(syncContext.LbrynetAddr) - videoID := args[0] - log.Debugf("Running sync for YouTube video ID %s", videoID) + log.Debugf("Running sync for video ID %s", videoID) var publisher VideoPublisher publisher, err = NewLocalSDKPublisher(syncContext.LbrynetAddr, syncContext.ChannelID, syncContext.PublishBid) @@ -85,10 +89,12 @@ func localCmd(cmd *cobra.Command, args []string) { } var videoSource VideoSource - videoSource, err = NewYtdlVideoSource(syncContext.TempDir) - if err != nil { - log.Errorf("Error setting up video source: %v", err) - return + if syncContext.YouTubeSourceConfig != nil { + videoSource, err = NewYtdlVideoSource(syncContext.TempDir, syncContext.YouTubeSourceConfig) + if err != nil { + log.Errorf("Error setting up video source: %v", err) + return + } } sourceVideo, err := videoSource.GetVideo(videoID) @@ -116,78 +122,6 @@ func localCmd(cmd *cobra.Command, args []string) { log.Info("Done") } -func getVideoMetadata(basePath, videoID string) (*ytdl.YtdlVideo, string, error) { - metadataPath := basePath + ".info.json" - - _, err := os.Stat(metadataPath) - if err != nil && !os.IsNotExist(err) { - log.Errorf("Error determining if video metadata already exists: %v", err) - return nil, "", err - } else if err == nil { - log.Debugf("Video metadata file %s already exists. Attempting to load existing file.", metadataPath) - videoMetadata, err := loadVideoMetadata(metadataPath) - if err != nil { - log.Debugf("Error loading pre-existing video metadata: %v. Deleting file and attempting re-download.", err) - } else { - return videoMetadata, metadataPath, nil - } - } - - if err := downloadVideoMetadata(basePath, videoID); err != nil { - return nil, "", err - } - - videoMetadata, err := loadVideoMetadata(metadataPath) - return videoMetadata, metadataPath, err -} - -func loadVideoMetadata(path string) (*ytdl.YtdlVideo, error) { - metadataBytes, err := os.ReadFile(path) - if err != nil { - return nil, err - } - - var videoMetadata *ytdl.YtdlVideo - err = json.Unmarshal(metadataBytes, &videoMetadata) - if err != nil { - return nil, err - } - - return videoMetadata, nil -} - -func getVideoDownloadedPath(videoDir, videoID string) (string, error) { - files, err := ioutil.ReadDir(videoDir) - if err != nil { - return "", err - } - - for _, f := range files { - if f.IsDir() { - continue - } - if path.Ext(f.Name()) == ".mp4" && strings.Contains(f.Name(), videoID) { - return path.Join(videoDir, f.Name()), nil - } - } - return "", errors.New("could not find any downloaded videos") - -} - -func getAbbrevDescription(v SourceVideo) string { - if v.Description == nil { - return v.SourceURL - } - - maxLength := 2800 - description := strings.TrimSpace(*v.Description) - additionalDescription := "\n" + v.SourceURL - if len(description) > maxLength { - description = description[:maxLength] - } - return description + "\n..." + additionalDescription -} - type SourceVideo struct { ID string Title *string @@ -243,9 +177,14 @@ func processVideoForPublishing(source SourceVideo, channelID string) (*Publishab claimName := namer.NewNamer().GetNextName(title) - thumbnailURL := "" - if source.ThumbnailURL != nil { - thumbnailURL = *source.ThumbnailURL + thumbnailURL := source.ThumbnailURL + if thumbnailURL == nil { + thumbnailURL = util.PtrToString("") + } + + releaseTime := source.ReleaseTime + if releaseTime == nil { + releaseTime = util.PtrToInt64(time.Now().Unix()) } processed := PublishableVideo { @@ -254,8 +193,8 @@ func processVideoForPublishing(source SourceVideo, channelID string) (*Publishab Description: getAbbrevDescription(source), Languages: languages, Tags: tags, - ReleaseTime: *source.ReleaseTime, - ThumbnailURL: thumbnailURL, + ReleaseTime: *releaseTime, + ThumbnailURL: *thumbnailURL, FullLocalPath: source.FullLocalPath, } @@ -264,6 +203,20 @@ func processVideoForPublishing(source SourceVideo, channelID string) (*Publishab return &processed, nil } +func getAbbrevDescription(v SourceVideo) string { + if v.Description == nil { + return v.SourceURL + } + + maxLength := 2800 + description := strings.TrimSpace(*v.Description) + additionalDescription := "\n" + v.SourceURL + if len(description) > maxLength { + description = description[:maxLength] + } + return description + "\n..." + additionalDescription +} + type VideoSource interface { GetVideo(id string) (*SourceVideo, error) } diff --git a/local/youtubeEnricher.go b/local/youtubeEnricher.go new file mode 100644 index 0000000..38369b5 --- /dev/null +++ b/local/youtubeEnricher.go @@ -0,0 +1,45 @@ +package local + +import ( + "time" + + log "github.com/sirupsen/logrus" + + "github.com/lbryio/lbry.go/v2/extras/util" +) + +type YouTubeVideoEnricher interface { + EnrichMissing(source *SourceVideo) error +} + +type YouTubeAPIVideoEnricher struct { + api *YouTubeAPI +} + +func NewYouTubeAPIVideoEnricher(apiKey string) (*YouTubeAPIVideoEnricher) { + enricher := YouTubeAPIVideoEnricher{ + api: NewYouTubeAPI(apiKey), + } + return &enricher +} + +func (e *YouTubeAPIVideoEnricher) EnrichMissing(source *SourceVideo) error { + if source.ReleaseTime != nil { + log.Debugf("Video %s does not need enrichment. YouTubeAPIVideoEnricher is skipping.", source.ID) + return nil + } + + snippet, err := e.api.GetVideoSnippet(source.ID) + if err != nil { + log.Errorf("Error snippet data for video %s: %v", err) + return err + } + + publishedAt, err := time.Parse(time.RFC3339, snippet.PublishedAt) + if err != nil { + log.Errorf("Error converting publishedAt to timestamp: %v", err) + } else { + source.ReleaseTime = util.PtrToInt64(publishedAt.Unix()) + } + return nil +} diff --git a/local/ytapi.go b/local/ytapi.go new file mode 100644 index 0000000..1924446 --- /dev/null +++ b/local/ytapi.go @@ -0,0 +1,83 @@ +package local + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + log "github.com/sirupsen/logrus" +) + +type YouTubeAPI struct { + apiKey string + client *http.Client +} + +func NewYouTubeAPI(apiKey string) (*YouTubeAPI) { + client := &http.Client { + Transport: &http.Transport{ + MaxIdleConns: 10, + IdleConnTimeout: 30 * time.Second, + DisableCompression: true, + }, + } + + api := YouTubeAPI { + apiKey: apiKey, + client: client, + } + + return &api +} + +func (a *YouTubeAPI) GetVideoSnippet(videoID string) (*VideoSnippet, error) { + req, err := http.NewRequest("GET", "https://youtube.googleapis.com/youtube/v3/videos", nil) + if err != nil { + log.Errorf("Error creating http client for YouTube API: %v", err) + return nil, err + } + + query := req.URL.Query() + query.Add("part", "snippet") + query.Add("id", videoID) + query.Add("key", a.apiKey) + req.URL.RawQuery = query.Encode() + + req.Header.Add("Accept", "application/json") + + resp, err := a.client.Do(req) + defer resp.Body.Close() + if err != nil { + log.Errorf("Error from YouTube API: %v", err) + return nil, err + } + + body, err := io.ReadAll(resp.Body) + log.Tracef("Response from YouTube API: %s", string(body[:])) + + var result videoListResponse + err = json.Unmarshal(body, &result) + if err != nil { + log.Errorf("Error deserializing video list response from YouTube API: %v", err) + return nil, err + } + + if len(result.Items) != 1 { + err = fmt.Errorf("YouTube API responded with incorrect number of snippets (%d) while attempting to get snippet data for video %s", len(result.Items), videoID) + return nil, err + } + + return &result.Items[0].Snippet, nil +} + +type videoListResponse struct { + Items []struct { + Snippet VideoSnippet `json:"snippet"` + } `json:"items"` +} + +type VideoSnippet struct { + PublishedAt string `json:"publishedAt"` +} diff --git a/local/ytdlVideoSource.go b/local/ytdlVideoSource.go index bc26884..a01b371 100644 --- a/local/ytdlVideoSource.go +++ b/local/ytdlVideoSource.go @@ -1,18 +1,17 @@ package local import ( - "time" - log "github.com/sirupsen/logrus" - "github.com/lbryio/lbry.go/v2/extras/util" + "github.com/lbryio/ytsync/v5/downloader/ytdl" ) type YtdlVideoSource struct { downloader Ytdl + enrichers []YouTubeVideoEnricher } -func NewYtdlVideoSource(downloadDir string) (*YtdlVideoSource, error) { +func NewYtdlVideoSource(downloadDir string, config *YouTubeSourceConfig) (*YtdlVideoSource, error) { ytdl, err := NewYtdl(downloadDir) if err != nil { return nil, err @@ -22,6 +21,11 @@ func NewYtdlVideoSource(downloadDir string) (*YtdlVideoSource, error) { downloader: *ytdl, } + if config.YouTubeAPIKey != "" { + ytapiEnricher := NewYouTubeAPIVideoEnricher(config.YouTubeAPIKey) + source.enrichers = append(source.enrichers, ytapiEnricher) + } + return &source, nil } @@ -36,6 +40,13 @@ func (s *YtdlVideoSource) GetVideo(id string) (*SourceVideo, error) { return nil, err } + var bestThumbnail *ytdl.Thumbnail = nil + for i, thumbnail := range metadata.Thumbnails { + if i == 0 || bestThumbnail.Width < thumbnail.Width { + bestThumbnail = &thumbnail + } + } + sourceVideo := SourceVideo { ID: id, Title: &metadata.Title, @@ -43,11 +54,18 @@ func (s *YtdlVideoSource) GetVideo(id string) (*SourceVideo, error) { SourceURL: "\nhttps://www.youtube.com/watch?v=" + id, Languages: []string{}, Tags: metadata.Tags, - ReleaseTime: util.PtrToInt64(time.Now().Unix()), - ThumbnailURL: nil, + ReleaseTime: nil, + ThumbnailURL: &bestThumbnail.URL, FullLocalPath: videoPath, } + for _, enricher := range s.enrichers { + err = enricher.EnrichMissing(&sourceVideo) + if err != nil { + log.Warnf("Error enriching video %s, continuing enrichment: %v", id, err) + } + } + log.Debugf("Source video retrieved via ytdl: %v", sourceVideo) return &sourceVideo, nil -- 2.45.2 From e564dc844548dd521b9880a52f4ed4063268b609 Mon Sep 17 00:00:00 2001 From: pseudoscalar <47653615+pseudoscalar@users.noreply.github.com> Date: Fri, 5 Nov 2021 10:09:59 -0500 Subject: [PATCH 05/10] Add dry-run option --- local/local.go | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/local/local.go b/local/local.go index 61f25f3..fed260b 100644 --- a/local/local.go +++ b/local/local.go @@ -17,6 +17,7 @@ import ( ) type SyncContext struct { + DryRun bool TempDir string LbrynetAddr string ChannelID string @@ -53,6 +54,7 @@ func AddCommand(rootCmd *cobra.Command) { Run: localCmd, Args: cobra.ExactArgs(1), } + cmd.Flags().BoolVar(&syncContext.DryRun, "dry-run", false, "Display information about the stream publishing, but do not publish the stream") cmd.Flags().StringVar(&syncContext.TempDir, "temp-dir", getEnvDefault("TEMP_DIR", ""), "directory to use for temporary files") cmd.Flags().Float64Var(&syncContext.PublishBid, "publish-bid", 0.01, "Bid amount for the stream claim") cmd.Flags().StringVar(&syncContext.LbrynetAddr, "lbrynet-address", getEnvDefault("LBRYNET_ADDRESS", ""), "JSONRPC address of the local LBRYNet daemon") @@ -109,15 +111,22 @@ func localCmd(cmd *cobra.Command, args []string) { return } - done, err := publisher.Publish(*processedVideo) - if err != nil { - log.Errorf("Error publishing video: %v", err) - return - } + if syncContext.DryRun { + log.Infoln("This is a dry run. Nothing will be published.") + log.Infof("The local file %s would be published to channel ID %s as %s.", processedVideo.FullLocalPath, syncContext.ChannelID, processedVideo.ClaimName) + log.Debugf("Object to be published: %v", processedVideo) - err = <-done - if err != nil { - log.Errorf("Error while wating for stream to reflect: %v", err) + } else { + done, err := publisher.Publish(*processedVideo) + if err != nil { + log.Errorf("Error publishing video: %v", err) + return + } + + err = <-done + if err != nil { + log.Errorf("Error while wating for stream to reflect: %v", err) + } } log.Info("Done") } -- 2.45.2 From 9f6b15e84122ae79a3b5528d7db182b5f5aaceba Mon Sep 17 00:00:00 2001 From: pseudoscalar <47653615+pseudoscalar@users.noreply.github.com> Date: Fri, 5 Nov 2021 11:22:20 -0500 Subject: [PATCH 06/10] Clean up local cache after publishing stream --- local/local.go | 11 +++++++++++ local/ytdl.go | 37 +++++++++++++++++++++++++++++++++++++ local/ytdlVideoSource.go | 4 ++++ 3 files changed, 52 insertions(+) diff --git a/local/local.go b/local/local.go index fed260b..abf3793 100644 --- a/local/local.go +++ b/local/local.go @@ -18,6 +18,7 @@ import ( type SyncContext struct { DryRun bool + KeepCache bool TempDir string LbrynetAddr string ChannelID string @@ -55,6 +56,7 @@ func AddCommand(rootCmd *cobra.Command) { Args: cobra.ExactArgs(1), } cmd.Flags().BoolVar(&syncContext.DryRun, "dry-run", false, "Display information about the stream publishing, but do not publish the stream") + cmd.Flags().BoolVar(&syncContext.KeepCache, "keep-cache", false, "Don't delete local files after publishing.") cmd.Flags().StringVar(&syncContext.TempDir, "temp-dir", getEnvDefault("TEMP_DIR", ""), "directory to use for temporary files") cmd.Flags().Float64Var(&syncContext.PublishBid, "publish-bid", 0.01, "Bid amount for the stream claim") cmd.Flags().StringVar(&syncContext.LbrynetAddr, "lbrynet-address", getEnvDefault("LBRYNET_ADDRESS", ""), "JSONRPC address of the local LBRYNet daemon") @@ -128,6 +130,14 @@ func localCmd(cmd *cobra.Command, args []string) { log.Errorf("Error while wating for stream to reflect: %v", err) } } + + if !syncContext.KeepCache { + log.Infof("Deleting local files.") + err = videoSource.DeleteLocalCache(videoID) + if err != nil { + log.Errorf("Error deleting local files for video %s: %v", videoID, err) + } + } log.Info("Done") } @@ -228,6 +238,7 @@ func getAbbrevDescription(v SourceVideo) string { type VideoSource interface { GetVideo(id string) (*SourceVideo, error) + DeleteLocalCache(id string) error } type VideoPublisher interface { diff --git a/local/ytdl.go b/local/ytdl.go index 712aa44..7baa893 100644 --- a/local/ytdl.go +++ b/local/ytdl.go @@ -49,6 +49,7 @@ func (y *Ytdl) GetVideoMetadata(videoID string) (*ytdl.YtdlVideo, error) { return metadata, nil } + func (y *Ytdl) GetVideoMetadataFile(videoID string) (string, error) { basePath := path.Join(y.DownloadDir, videoID) metadataPath := basePath + ".info.json" @@ -101,6 +102,42 @@ func (y *Ytdl) GetVideoFile(videoID string) (string, error) { return *videoPath, nil } +func (y *Ytdl) DeleteVideoFiles(videoID string) error { + files, err := ioutil.ReadDir(y.DownloadDir) + if err != nil { + return err + } + + for _, f := range files { + if f.IsDir() { + continue + } + if strings.Contains(f.Name(), videoID) { + videoPath := path.Join(y.DownloadDir, f.Name()) + err = os.Remove(videoPath) + if err != nil { + log.Errorf("Error while deleting file %s: %v", y.DownloadDir, err) + return err + } + } + } + + return nil +} + +func deleteFile(path string) error { + _, err := os.Stat(path) + if err != nil && !os.IsNotExist(err) { + log.Errorf("Error determining if file %s exists: %v", path, err) + return err + } else if err != nil { + log.Debugf("File %s does not exist. Skipping deletion.", path) + return nil + } + + return os.Remove(path) +} + func findDownloadedVideo(videoDir, videoID string) (*string, error) { files, err := ioutil.ReadDir(videoDir) if err != nil { diff --git a/local/ytdlVideoSource.go b/local/ytdlVideoSource.go index a01b371..b68c18f 100644 --- a/local/ytdlVideoSource.go +++ b/local/ytdlVideoSource.go @@ -70,3 +70,7 @@ func (s *YtdlVideoSource) GetVideo(id string) (*SourceVideo, error) { return &sourceVideo, nil } + +func (s *YtdlVideoSource) DeleteLocalCache(id string) error { + return s.downloader.DeleteVideoFiles(id) +} -- 2.45.2 From ce901f6b01dca938c381d7730dea64caeeb6c579 Mon Sep 17 00:00:00 2001 From: pseudoscalar <47653615+pseudoscalar@users.noreply.github.com> Date: Wed, 17 Nov 2021 10:07:20 -0600 Subject: [PATCH 07/10] Add option to not wait for reflection. Add instructions for getting a YouTube API key. Fix some minor issues. --- local/local.go | 35 +++++++++++++++++++++-------------- local/localSDKPublisher.go | 11 ++++++++--- local/readme.md | 18 +++++++++++++++++- 3 files changed, 46 insertions(+), 18 deletions(-) diff --git a/local/local.go b/local/local.go index abf3793..9b7837e 100644 --- a/local/local.go +++ b/local/local.go @@ -17,12 +17,13 @@ import ( ) type SyncContext struct { - DryRun bool - KeepCache bool - TempDir string - LbrynetAddr string - ChannelID string - PublishBid float64 + DryRun bool + KeepCache bool + ReflectStreams bool + TempDir string + LbrynetAddr string + ChannelID string + PublishBid float64 YouTubeSourceConfig *YouTubeSourceConfig } @@ -57,6 +58,7 @@ func AddCommand(rootCmd *cobra.Command) { } cmd.Flags().BoolVar(&syncContext.DryRun, "dry-run", false, "Display information about the stream publishing, but do not publish the stream") cmd.Flags().BoolVar(&syncContext.KeepCache, "keep-cache", false, "Don't delete local files after publishing.") + cmd.Flags().BoolVar(&syncContext.ReflectStreams, "reflect-streams", true, "Require published streams to be reflected.") cmd.Flags().StringVar(&syncContext.TempDir, "temp-dir", getEnvDefault("TEMP_DIR", ""), "directory to use for temporary files") cmd.Flags().Float64Var(&syncContext.PublishBid, "publish-bid", 0.01, "Bid amount for the stream claim") cmd.Flags().StringVar(&syncContext.LbrynetAddr, "lbrynet-address", getEnvDefault("LBRYNET_ADDRESS", ""), "JSONRPC address of the local LBRYNet daemon") @@ -119,15 +121,19 @@ func localCmd(cmd *cobra.Command, args []string) { log.Debugf("Object to be published: %v", processedVideo) } else { - done, err := publisher.Publish(*processedVideo) + doneReflectingCh, err := publisher.Publish(*processedVideo, syncContext.ReflectStreams) if err != nil { log.Errorf("Error publishing video: %v", err) return } - err = <-done - if err != nil { - log.Errorf("Error while wating for stream to reflect: %v", err) + if syncContext.ReflectStreams { + err = <-doneReflectingCh + if err != nil { + log.Errorf("Error while wating for stream to reflect: %v", err) + } + } else { + log.Debugln("Not waiting for stream to reflect.") } } @@ -227,13 +233,14 @@ func getAbbrevDescription(v SourceVideo) string { return v.SourceURL } - maxLength := 2800 + additionalDescription := "\n...\n" + v.SourceURL + maxLength := 2800 - len(additionalDescription) + description := strings.TrimSpace(*v.Description) - additionalDescription := "\n" + v.SourceURL if len(description) > maxLength { description = description[:maxLength] } - return description + "\n..." + additionalDescription + return description + additionalDescription } type VideoSource interface { @@ -242,5 +249,5 @@ type VideoSource interface { } type VideoPublisher interface { - Publish(video PublishableVideo) (chan error, error) + Publish(video PublishableVideo, reflectStream bool) (chan error, error) } diff --git a/local/localSDKPublisher.go b/local/localSDKPublisher.go index 4e99355..3886eb9 100644 --- a/local/localSDKPublisher.go +++ b/local/localSDKPublisher.go @@ -48,7 +48,7 @@ func NewLocalSDKPublisher(sdkAddr, channelID string, publishBid float64) (*Local return &publisher, nil } -func (p *LocalSDKPublisher) Publish(video PublishableVideo) (chan error, error) { +func (p *LocalSDKPublisher) Publish(video PublishableVideo, reflectStream bool) (chan error, error) { streamCreateOptions := jsonrpc.StreamCreateOptions { ClaimCreateOptions: jsonrpc.ClaimCreateOptions { Title: &video.Title, @@ -67,6 +67,10 @@ func (p *LocalSDKPublisher) Publish(video PublishableVideo) (chan error, error) return nil, err } + if !reflectStream { + return nil, nil + } + done := make(chan error, 1) go func() { for { @@ -88,8 +92,9 @@ func (p *LocalSDKPublisher) Publish(video PublishableVideo) (chan error, error) break } if !fileStatus.UploadingToReflector { - log.Warn("Stream is not being uploaded to a reflector. Check your lbrynet settings if this is a mistake.") - break + log.Error("Stream is not being uploaded to a reflector. Check your lbrynet settings if this is a mistake.") + done <- errors.New("Stream is not being reflected (check lbrynet settings).") + return } log.Infof("Stream reflector progress: %d%%", fileStatus.ReflectorProgress) time.Sleep(5 * time.Second) diff --git a/local/readme.md b/local/readme.md index 16efe8b..81b0235 100644 --- a/local/readme.md +++ b/local/readme.md @@ -5,6 +5,7 @@ - LBRY SDK (what do we actually need this for?) - youtube-dl - enough space to cache stuff +- YouTube data API key ## Process @@ -15,6 +16,21 @@ - or easier, just error if no channel - enough lbc in wallet? +### Getting a YouTube API key + +To access the YouTube data API, you will first need some kind of google account. + +The API has two methods of authentication, OAuth2 and API keys. This application uses API keys. +These API keys are basically like passwords, and so once obtained, they should not be shared. + +The instructions for obtaining an API key are copied below from [here](https://developers.google.com/youtube/registering_an_application): + + +1. Open the [Credentials page](https://console.developers.google.com/apis/credentials) in the API Console. +2. Create an API key in the Console by clicking **Create credentials > API key**. You can restrict the key before using it in production by clicking **Restrict key** and selecting one of the **Restrictions**. + +To keep your API keys secure, follow the [best practices for securely using API keys](https://cloud.google.com/docs/authentication/api-keys). + ### Options to figure out what's already synced - simplest: assume nothing is synced yet @@ -50,4 +66,4 @@ ### Debugging -- dry-running the whole thing \ No newline at end of file +- dry-running the whole thing -- 2.45.2 From e8adf6f6ce34bb57fd31d414630841462a9c6a2c Mon Sep 17 00:00:00 2001 From: pseudoscalar <47653615+pseudoscalar@users.noreply.github.com> Date: Thu, 2 Dec 2021 11:17:16 -0600 Subject: [PATCH 08/10] Add local DB to track syncs in preparation for whole channel syncing. --- go.mod | 1 + go.sum | 1 + local/local.go | 44 ++++- local/localSDKPublisher.go | 19 +- local/syncDb.go | 395 +++++++++++++++++++++++++++++++++++++ local/ytdlVideoSource.go | 1 + 6 files changed, 455 insertions(+), 6 deletions(-) create mode 100644 local/syncDb.go diff --git a/go.mod b/go.mod index 40b2c65..0c2cdac 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/kr/pretty v0.2.1 // indirect github.com/lbryio/lbry.go/v2 v2.7.2-0.20210824154606-3e18b74da08b github.com/lbryio/reflector.go v1.1.3-0.20210412225256-4392c9724262 + github.com/mattn/go-sqlite3 v1.10.0 github.com/miekg/dns v1.1.22 // indirect github.com/mitchellh/go-ps v0.0.0-20190716172923-621e5597135b github.com/opencontainers/go-digest v1.0.0-rc1 // indirect diff --git a/go.sum b/go.sum index 3b1c9d2..1d40796 100644 --- a/go.sum +++ b/go.sum @@ -318,6 +318,7 @@ github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaO github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU= github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/mattn/go-sqlite3 v1.10.0 h1:jbhqpg7tQe4SupckyijYiy0mJJ/pRyHvXf7JdWK860o= github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= diff --git a/local/local.go b/local/local.go index 9b7837e..afd2d4f 100644 --- a/local/local.go +++ b/local/local.go @@ -21,6 +21,7 @@ type SyncContext struct { KeepCache bool ReflectStreams bool TempDir string + SyncDbPath string LbrynetAddr string ChannelID string PublishBid float64 @@ -31,6 +32,9 @@ func (c *SyncContext) Validate() error { if c.TempDir == "" { return errors.New("No TempDir provided") } + if c.SyncDbPath == "" { + return errors.New("No sync DB path provided") + } if c.LbrynetAddr == "" { return errors.New("No Lbrynet address provided") } @@ -60,6 +64,7 @@ func AddCommand(rootCmd *cobra.Command) { cmd.Flags().BoolVar(&syncContext.KeepCache, "keep-cache", false, "Don't delete local files after publishing.") cmd.Flags().BoolVar(&syncContext.ReflectStreams, "reflect-streams", true, "Require published streams to be reflected.") cmd.Flags().StringVar(&syncContext.TempDir, "temp-dir", getEnvDefault("TEMP_DIR", ""), "directory to use for temporary files") + cmd.Flags().StringVar(&syncContext.SyncDbPath, "sync-db-path", getEnvDefault("SYNC_DB_PATH", ""), "Path to the local sync DB") cmd.Flags().Float64Var(&syncContext.PublishBid, "publish-bid", 0.01, "Bid amount for the stream claim") cmd.Flags().StringVar(&syncContext.LbrynetAddr, "lbrynet-address", getEnvDefault("LBRYNET_ADDRESS", ""), "JSONRPC address of the local LBRYNet daemon") cmd.Flags().StringVar(&syncContext.ChannelID, "channel-id", "", "LBRY channel ID to publish to") @@ -87,6 +92,24 @@ func localCmd(cmd *cobra.Command, args []string) { log.Debugf("Running sync for video ID %s", videoID) + syncDB, err := NewSyncDb(syncContext.SyncDbPath) + if err != nil { + log.Errorf("Error creating sync DB: %v", err) + return + } + defer syncDB.Close() + + isSynced, claimID, err := syncDB.IsVideoPublished("YouTube", videoID) + if err != nil { + log.Errorf("Error checking if video is already synced: %v", err) + return + } + + if isSynced { + log.Infof("Video %s is already published as %s.", videoID, claimID) + return + } + var publisher VideoPublisher publisher, err = NewLocalSDKPublisher(syncContext.LbrynetAddr, syncContext.ChannelID, syncContext.PublishBid) if err != nil { @@ -109,6 +132,12 @@ func localCmd(cmd *cobra.Command, args []string) { return } + err = syncDB.SaveVideoData(*sourceVideo) + if err != nil { + log.Errorf("Error saving video data: %v", err) + return + } + processedVideo, err := processVideoForPublishing(*sourceVideo, syncContext.ChannelID) if err != nil { log.Errorf("Error processing source video for publishing: %v", err) @@ -121,11 +150,18 @@ func localCmd(cmd *cobra.Command, args []string) { log.Debugf("Object to be published: %v", processedVideo) } else { - doneReflectingCh, err := publisher.Publish(*processedVideo, syncContext.ReflectStreams) + claimID, doneReflectingCh, err := publisher.Publish(*processedVideo, syncContext.ReflectStreams) if err != nil { log.Errorf("Error publishing video: %v", err) return } + err = syncDB.SaveVideoPublication(*processedVideo, claimID) + if err != nil { + // Sync DB is corrupted after getting here + // and will allow double publication. + log.Errorf("Error saving video publication to sync DB: %v", err) + return + } if syncContext.ReflectStreams { err = <-doneReflectingCh @@ -149,6 +185,7 @@ func localCmd(cmd *cobra.Command, args []string) { type SourceVideo struct { ID string + Source string Title *string Description *string SourceURL string @@ -161,6 +198,7 @@ type SourceVideo struct { type PublishableVideo struct { ID string + Source string ClaimName string Title string Description string @@ -213,6 +251,8 @@ func processVideoForPublishing(source SourceVideo, channelID string) (*Publishab } processed := PublishableVideo { + ID: source.ID, + Source: source.Source, ClaimName: claimName, Title: title, Description: getAbbrevDescription(source), @@ -249,5 +289,5 @@ type VideoSource interface { } type VideoPublisher interface { - Publish(video PublishableVideo, reflectStream bool) (chan error, error) + Publish(video PublishableVideo, reflectStream bool) (string, chan error, error) } diff --git a/local/localSDKPublisher.go b/local/localSDKPublisher.go index 3886eb9..c27a721 100644 --- a/local/localSDKPublisher.go +++ b/local/localSDKPublisher.go @@ -48,7 +48,7 @@ func NewLocalSDKPublisher(sdkAddr, channelID string, publishBid float64) (*Local return &publisher, nil } -func (p *LocalSDKPublisher) Publish(video PublishableVideo, reflectStream bool) (chan error, error) { +func (p *LocalSDKPublisher) Publish(video PublishableVideo, reflectStream bool) (string, chan error, error) { streamCreateOptions := jsonrpc.StreamCreateOptions { ClaimCreateOptions: jsonrpc.ClaimCreateOptions { Title: &video.Title, @@ -64,11 +64,22 @@ func (p *LocalSDKPublisher) Publish(video PublishableVideo, reflectStream bool) txSummary, err := p.lbrynet.StreamCreate(video.ClaimName, video.FullLocalPath, p.publishBid, streamCreateOptions) if err != nil { - return nil, err + return "", nil, err + } + + var claimID *string + for _, output := range txSummary.Outputs { + if output.Type == "claim" { + claimID = &output.ClaimID + break + } + } + if claimID == nil { + return "", nil, errors.New("Publish transaction did not have a claim output.") } if !reflectStream { - return nil, nil + return "", nil, nil } done := make(chan error, 1) @@ -102,7 +113,7 @@ func (p *LocalSDKPublisher) Publish(video PublishableVideo, reflectStream bool) done <- nil }() - return done, nil + return *claimID, done, nil } // if jsonrpc.Client.FileList is extended to match the actual jsonrpc schema, this can be removed diff --git a/local/syncDb.go b/local/syncDb.go new file mode 100644 index 0000000..34b3eb0 --- /dev/null +++ b/local/syncDb.go @@ -0,0 +1,395 @@ +package local + +import ( + "database/sql" + _ "github.com/mattn/go-sqlite3" + log "github.com/sirupsen/logrus" +) + +type SyncDb struct { + db *sql.DB +} + +func NewSyncDb(path string) (*SyncDb, error) { + db, err := sql.Open("sqlite3", path) + if err != nil { + log.Errorf("Error opening cache DB at %s: %v", path, err) + return nil, err + } + + cache := SyncDb { + db: db, + } + err = cache.ensureSchema() + if err != nil { + log.Errorf("Error while ensuring sync DB structure: %v", err) + return nil, err + } + + return &cache, nil +} + +func (c *SyncDb) Close() error { + return c.db.Close() +} + +func (c *SyncDb) SaveVideoData(video SourceVideo) error { + upsertSql := ` +INSERT INTO videos ( + source, + native_id, + title, + description, + source_url, + release_time, + thumbnail_url, + full_local_path +) VALUES (?, ?, ?, ?, ?, ?, ?, ?) +ON CONFLICT (source, native_id) +DO UPDATE SET + title = excluded.title, + description = excluded.description, + source_url = excluded.source_url, + release_time = excluded.release_time, + thumbnail_url = excluded.thumbnail_url, + full_local_path = excluded.full_local_path; + ` + _, err := c.db.Exec( + upsertSql, + video.Source, + video.ID, + video.Title, + video.Description, + video.SourceURL, + video.ReleaseTime, + video.ThumbnailURL, + video.FullLocalPath, + ) + if err != nil { + return err + } + + err = c.upsertTags(video.Source, video.ID, video.Tags) + if err != nil { + return err + } + + err = c.upsertLanguages(video.Source, video.ID, video.Languages) + return err +} + +func (c *SyncDb) SaveVideoPublication(video PublishableVideo, claimID string) error { + upsertSql := ` +INSERT INTO videos ( + source, + native_id, + title, + description, + source_url, + release_time, + thumbnail_url, + full_local_path, + claim_id +) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) +ON CONFLICT (source, native_id) +DO UPDATE SET + title = excluded.title, + description = excluded.description, + source_url = excluded.source_url, + release_time = excluded.release_time, + thumbnail_url = excluded.thumbnail_url, + full_local_path = excluded.full_local_path, + claim_id = excluded.claim_id; + ` + _, err := c.db.Exec( + upsertSql, + video.Source, + video.ID, + video.Title, + video.Description, + video.SourceURL, + video.ReleaseTime, + video.ThumbnailURL, + video.FullLocalPath, + claimID, + ) + if err != nil { + return err + } + + err = c.upsertTags(video.Source, video.ID, video.Tags) + if err != nil { + return err + } + + err = c.upsertLanguages(video.Source, video.ID, video.Languages) + return err +} + +func (c *SyncDb) IsVideoPublished(source, id string) (bool, string, error) { + selectSql := ` +SELECT + claim_id +FROM videos +WHERE source = ? AND native_id = ? + ` + row := c.db.QueryRow(selectSql, source, id) + + var claimID sql.NullString + err := row.Scan(&claimID) + + if err == sql.ErrNoRows { + return false, "", nil + } else if err != nil { + log.Errorf("Error querying video publication for %s:%s from sync DB: %v", source, id, err) + return false, "", err + } + + if claimID.Valid { + return true, claimID.String, nil + } else { + return false, "", nil + } +} + +func (c *SyncDb) GetSavedVideoData(source, id string) (*SourceVideo, *string, error) { + selectSql := ` +SELECT + native_id, + title, + description, + source_url, + release_time, + thumbnail_url, + full_local_path, + claim_id +FROM videos +WHERE source = ? AND native_id = ? + ` + row := c.db.QueryRow(selectSql, source, id) + + var record syncRecord + err := row.Scan( + &record.nativeID, + &record.title, + &record.description, + &record.sourceURL, + &record.releaseTime, + &record.thumbnailURL, + &record.fullLocalPath, + &record.claimID, + ) + if err == sql.ErrNoRows { + log.Debugf("Data for YouTube:%s is not in the sync DB", id) + return nil, nil, nil + } else if err != nil { + log.Errorf("Error querying video data for %s:%s from sync DB: %v", source, id, err) + return nil, nil, err + } + sourceVideo, claimID := record.toSourceVideo() + + tags, err := c.getTags(source, id) + if err != nil { + return nil, nil, err + } + + languages, err := c.getLanguages(source, id) + if err != nil { + return nil, nil, err + } + + sourceVideo.Tags = tags + sourceVideo.Languages = languages + + return &sourceVideo, claimID, nil +} + +func (c *SyncDb) ensureSchema() error { + createSql := ` +CREATE TABLE IF NOT EXISTS videos ( + source TEXT, + native_id TEXT, + title TEXT, + description TEXT, + source_url TEXT, + release_time INT, + thumbnail_url TEXT, + full_local_path TEXT, + claim_id TEXT, + PRIMARY KEY (source, native_id) +); +CREATE TABLE IF NOT EXISTS video_tags ( + source TEXT NOT NULL, + native_id TEXT NOT NULL, + tag TEXT NOT NULL, + UNIQUE (source, native_id, tag) +); +CREATE TABLE IF NOT EXISTS video_languages ( + source TEXT NOT NULL, + native_id TEXT NOT NULL, + language TEXT NOT NULL, + UNIQUE (source, native_id, language) +); + ` + _, err := c.db.Exec(createSql) + return err +} + +func (c *SyncDb) upsertTags(source, id string, tags []string) error { + upsertSql := ` +INSERT INTO video_tags ( + source, + native_id, + tag +) VALUES (?, ?, ?) +ON CONFLICT (source, native_id, tag) +DO NOTHING; + ` + + for _, tag := range tags { + _, err := c.db.Exec( + upsertSql, + source, + id, + tag, + ) + if err != nil { + log.Errorf("Error inserting tag %s into sync DB for %s:%s: %v", tag, source, id, err) + return err + } + } + return nil +} + +func (c *SyncDb) getTags(source, id string) ([]string, error) { + selectSql := ` +SELECT tag +FROM video_tags +WHERE source = ? AND native_id = ?; + ` + + rows, err := c.db.Query(selectSql, source, id) + if err != nil { + log.Errorf("Error getting tags from sync DB for %s:%s: %v", source, id, err) + return nil, err + } + defer rows.Close() + + var tags []string + for rows.Next() { + var tag string + err = rows.Scan(&tag) + if err != nil { + log.Error("Error deserializing tag from sync DB for %s:%s: %v", source, id, err) + return nil, err + } + tags = append(tags, tag) + } + + return tags, nil +} + +func (c *SyncDb) upsertLanguages(source, id string, languages []string) error { + upsertSql := ` +INSERT INTO video_languages ( + source, + native_id, + language +) VALUES (?, ?, ?) +ON CONFLICT (source, native_id, language) +DO NOTHING; + ` + + for _, language := range languages { + _, err := c.db.Exec( + upsertSql, + source, + id, + language, + ) + if err != nil { + log.Errorf("Error inserting language %s into sync DB for %s:%s: %v", language, source, id, err) + return err + } + } + return nil +} + +func (c *SyncDb) getLanguages(source, id string) ([]string, error) { + selectSql := ` +SELECT language +FROM video_languages +WHERE source = ? AND native_id = ?; + ` + + rows, err := c.db.Query(selectSql, source, id) + if err != nil { + log.Errorf("Error getting languages from sync DB for %s:%s: %v", source, id, err) + return nil, err + } + defer rows.Close() + + var languages []string + for rows.Next() { + var language string + err = rows.Scan(&language) + if err != nil { + log.Error("Error deserializing language from sync DB for %s:%s: %v", source, id, err) + return nil, err + } + languages = append(languages, language) + } + + return languages, nil +} + +type syncRecord struct { + source string + nativeID string + title sql.NullString + description sql.NullString + sourceURL string + releaseTime sql.NullInt64 + thumbnailURL sql.NullString + fullLocalPath string + claimID sql.NullString +} + +func (r *syncRecord) toSourceVideo() (SourceVideo, *string) { + video := SourceVideo { + ID: r.nativeID, + Source: r.source, + SourceURL: r.sourceURL, + FullLocalPath: r.fullLocalPath, + } + + if r.title.Valid { + video.Title = &r.title.String + } else { + video.Title = nil + } + + if r.description.Valid { + video.Description = &r.description.String + } else { + video.Description = nil + } + + if r.releaseTime.Valid { + video.ReleaseTime = &r.releaseTime.Int64 + } else { + video.ReleaseTime = nil + } + + if r.thumbnailURL.Valid { + video.ThumbnailURL = &r.thumbnailURL.String + } else { + video.ThumbnailURL = nil + } + + if r.claimID.Valid { + return video, &r.claimID.String + } else { + return video, nil + } +} diff --git a/local/ytdlVideoSource.go b/local/ytdlVideoSource.go index b68c18f..13080f9 100644 --- a/local/ytdlVideoSource.go +++ b/local/ytdlVideoSource.go @@ -49,6 +49,7 @@ func (s *YtdlVideoSource) GetVideo(id string) (*SourceVideo, error) { sourceVideo := SourceVideo { ID: id, + Source: "YouTube", Title: &metadata.Title, Description: &metadata.Description, SourceURL: "\nhttps://www.youtube.com/watch?v=" + id, -- 2.45.2 From 748921343974ff0779d5655dad256d8d81b4b7ef Mon Sep 17 00:00:00 2001 From: pseudoscalar <47653615+pseudoscalar@users.noreply.github.com> Date: Mon, 6 Dec 2021 11:44:56 -0600 Subject: [PATCH 09/10] Rearrange code to prepare for a channel oriented process, and to prepare for other video sources. --- local/local.go | 51 +++++++++++++++++++++++++++++++--------- local/ytdlVideoSource.go | 4 ++-- 2 files changed, 42 insertions(+), 13 deletions(-) diff --git a/local/local.go b/local/local.go index afd2d4f..12ca71d 100644 --- a/local/local.go +++ b/local/local.go @@ -24,6 +24,7 @@ type SyncContext struct { SyncDbPath string LbrynetAddr string ChannelID string + VideoID string PublishBid float64 YouTubeSourceConfig *YouTubeSourceConfig } @@ -44,11 +45,22 @@ func (c *SyncContext) Validate() error { if c.PublishBid <= 0.0 { return errors.New("Publish bid is not greater than zero") } + + if c.YouTubeSourceConfig.ChannelID != "" { + // Validate for YouTube source + // For now, an API key is required + if c.YouTubeSourceConfig.APIKey == "" { + return errors.New("YouTube source was selected, but no YouTube API key was provided.") + } + } else { + return errors.New("No video source provided") + } return nil } type YouTubeSourceConfig struct { - YouTubeAPIKey string + ChannelID string + APIKey string } var syncContext SyncContext @@ -58,7 +70,7 @@ func AddCommand(rootCmd *cobra.Command) { Use: "local", Short: "run a personal ytsync", Run: localCmd, - Args: cobra.ExactArgs(1), + Args: cobra.ExactArgs(0), } cmd.Flags().BoolVar(&syncContext.DryRun, "dry-run", false, "Display information about the stream publishing, but do not publish the stream") cmd.Flags().BoolVar(&syncContext.KeepCache, "keep-cache", false, "Don't delete local files after publishing.") @@ -68,10 +80,11 @@ func AddCommand(rootCmd *cobra.Command) { cmd.Flags().Float64Var(&syncContext.PublishBid, "publish-bid", 0.01, "Bid amount for the stream claim") cmd.Flags().StringVar(&syncContext.LbrynetAddr, "lbrynet-address", getEnvDefault("LBRYNET_ADDRESS", ""), "JSONRPC address of the local LBRYNet daemon") cmd.Flags().StringVar(&syncContext.ChannelID, "channel-id", "", "LBRY channel ID to publish to") + cmd.Flags().StringVar(&syncContext.VideoID, "video-id", "", "ID of video to sync. This will attempt to sync only this one video.") - // For now, assume source is always YouTube syncContext.YouTubeSourceConfig = &YouTubeSourceConfig{} - cmd.Flags().StringVar(&syncContext.YouTubeSourceConfig.YouTubeAPIKey, "youtube-api-key", getEnvDefault("YOUTUBE_API_KEY", ""), "YouTube API Key") + cmd.Flags().StringVar(&syncContext.YouTubeSourceConfig.APIKey, "youtube-api-key", getEnvDefault("YOUTUBE_API_KEY", ""), "YouTube API Key") + cmd.Flags().StringVar(&syncContext.YouTubeSourceConfig.ChannelID, "youtube-channel", "", "YouTube Channel ID") rootCmd.AddCommand(cmd) } @@ -88,7 +101,11 @@ func localCmd(cmd *cobra.Command, args []string) { log.Error(err) return } - videoID := args[0] + videoID := syncContext.VideoID + if videoID == "" { + log.Errorf("Only single video mode is supported currently. Please provided a video ID.") + return + } log.Debugf("Running sync for video ID %s", videoID) @@ -126,22 +143,31 @@ func localCmd(cmd *cobra.Command, args []string) { } } + err = syncVideo(syncContext, syncDB, videoSource, publisher, videoID) + if err != nil { + log.Errorf("Error syncing %s: %v", videoID, err) + return + } + log.Info("Done") +} + +func syncVideo(syncContext SyncContext, syncDB *SyncDb, videoSource VideoSource, publisher VideoPublisher, videoID string) error { sourceVideo, err := videoSource.GetVideo(videoID) if err != nil { log.Errorf("Error getting source video: %v", err) - return + return err } err = syncDB.SaveVideoData(*sourceVideo) if err != nil { log.Errorf("Error saving video data: %v", err) - return + return err } processedVideo, err := processVideoForPublishing(*sourceVideo, syncContext.ChannelID) if err != nil { log.Errorf("Error processing source video for publishing: %v", err) - return + return err } if syncContext.DryRun { @@ -153,20 +179,21 @@ func localCmd(cmd *cobra.Command, args []string) { claimID, doneReflectingCh, err := publisher.Publish(*processedVideo, syncContext.ReflectStreams) if err != nil { log.Errorf("Error publishing video: %v", err) - return + return err } err = syncDB.SaveVideoPublication(*processedVideo, claimID) if err != nil { // Sync DB is corrupted after getting here // and will allow double publication. log.Errorf("Error saving video publication to sync DB: %v", err) - return + return err } if syncContext.ReflectStreams { err = <-doneReflectingCh if err != nil { log.Errorf("Error while wating for stream to reflect: %v", err) + return err } } else { log.Debugln("Not waiting for stream to reflect.") @@ -178,9 +205,11 @@ func localCmd(cmd *cobra.Command, args []string) { err = videoSource.DeleteLocalCache(videoID) if err != nil { log.Errorf("Error deleting local files for video %s: %v", videoID, err) + return err } } - log.Info("Done") + + return nil } type SourceVideo struct { diff --git a/local/ytdlVideoSource.go b/local/ytdlVideoSource.go index 13080f9..e82a2a7 100644 --- a/local/ytdlVideoSource.go +++ b/local/ytdlVideoSource.go @@ -21,8 +21,8 @@ func NewYtdlVideoSource(downloadDir string, config *YouTubeSourceConfig) (*YtdlV downloader: *ytdl, } - if config.YouTubeAPIKey != "" { - ytapiEnricher := NewYouTubeAPIVideoEnricher(config.YouTubeAPIKey) + if config.APIKey != "" { + ytapiEnricher := NewYouTubeAPIVideoEnricher(config.APIKey) source.enrichers = append(source.enrichers, ytapiEnricher) } -- 2.45.2 From aa16d32135c331518b0935d425ece6e829acd582 Mon Sep 17 00:00:00 2001 From: pseudoscalar <47653615+pseudoscalar@users.noreply.github.com> Date: Tue, 25 Jan 2022 09:36:46 -0600 Subject: [PATCH 10/10] Finish v2 (sync whole channel) --- local/local.go | 208 ++++++++++++++--- local/localSDKPublisher.go | 56 ++++- local/syncDb.go | 397 ++++++++++++++++++++++++++++----- local/youtubeChannelScanner.go | 51 +++++ local/youtubeEnricher.go | 29 +++ local/ytapi.go | 74 ++++++ local/ytdlVideoSource.go | 28 ++- 7 files changed, 747 insertions(+), 96 deletions(-) create mode 100644 local/youtubeChannelScanner.go diff --git a/local/local.go b/local/local.go index 12ca71d..2665743 100644 --- a/local/local.go +++ b/local/local.go @@ -20,6 +20,7 @@ type SyncContext struct { DryRun bool KeepCache bool ReflectStreams bool + ForceChannelScan bool TempDir string SyncDbPath string LbrynetAddr string @@ -75,6 +76,7 @@ func AddCommand(rootCmd *cobra.Command) { cmd.Flags().BoolVar(&syncContext.DryRun, "dry-run", false, "Display information about the stream publishing, but do not publish the stream") cmd.Flags().BoolVar(&syncContext.KeepCache, "keep-cache", false, "Don't delete local files after publishing.") cmd.Flags().BoolVar(&syncContext.ReflectStreams, "reflect-streams", true, "Require published streams to be reflected.") + cmd.Flags().BoolVar(&syncContext.ForceChannelScan, "force-rescan", false, "Rescan channel to fill the sync DB.") cmd.Flags().StringVar(&syncContext.TempDir, "temp-dir", getEnvDefault("TEMP_DIR", ""), "directory to use for temporary files") cmd.Flags().StringVar(&syncContext.SyncDbPath, "sync-db-path", getEnvDefault("SYNC_DB_PATH", ""), "Path to the local sync DB") cmd.Flags().Float64Var(&syncContext.PublishBid, "publish-bid", 0.01, "Bid amount for the stream claim") @@ -101,13 +103,6 @@ func localCmd(cmd *cobra.Command, args []string) { log.Error(err) return } - videoID := syncContext.VideoID - if videoID == "" { - log.Errorf("Only single video mode is supported currently. Please provided a video ID.") - return - } - - log.Debugf("Running sync for video ID %s", videoID) syncDB, err := NewSyncDb(syncContext.SyncDbPath) if err != nil { @@ -116,17 +111,6 @@ func localCmd(cmd *cobra.Command, args []string) { } defer syncDB.Close() - isSynced, claimID, err := syncDB.IsVideoPublished("YouTube", videoID) - if err != nil { - log.Errorf("Error checking if video is already synced: %v", err) - return - } - - if isSynced { - log.Infof("Video %s is already published as %s.", videoID, claimID) - return - } - var publisher VideoPublisher publisher, err = NewLocalSDKPublisher(syncContext.LbrynetAddr, syncContext.ChannelID, syncContext.PublishBid) if err != nil { @@ -136,37 +120,121 @@ func localCmd(cmd *cobra.Command, args []string) { var videoSource VideoSource if syncContext.YouTubeSourceConfig != nil { - videoSource, err = NewYtdlVideoSource(syncContext.TempDir, syncContext.YouTubeSourceConfig) + videoSource, err = NewYtdlVideoSource(syncContext.TempDir, syncContext.YouTubeSourceConfig, syncDB) if err != nil { log.Errorf("Error setting up video source: %v", err) return } } - err = syncVideo(syncContext, syncDB, videoSource, publisher, videoID) - if err != nil { - log.Errorf("Error syncing %s: %v", videoID, err) - return + latestPublishedReleaseTime := int64(0) + latestKnownReleaseTime := int64(0) + if syncContext.ForceChannelScan { + log.Infof("Channel scan is being forced.") + } else { + dbSummary, err := syncDB.GetSummary() + if err != nil { + log.Errorf("Error getting sync DB summary for update scan: %v", err) + return + } + latestPublishedReleaseTime = dbSummary.LatestPublished + latestKnownReleaseTime = dbSummary.LatestKnown + } + log.Debugf("Latest known release time: %d", latestKnownReleaseTime) + for result := range videoSource.Scan(latestKnownReleaseTime) { + if result.Error != nil { + log.Errorf("Error while discovering new videos from source: %v", result.Error) + } else { + syncDB.SaveKnownVideo(*result.Video) + } + } + log.Debugf("Latest published release time: %d", latestPublishedReleaseTime) + for result := range publisher.PublishedVideoIterator(latestPublishedReleaseTime) { + if result.Error != nil { + log.Errorf("Error while discovering published videos: %v", result.Error) + } else { + syncDB.SavePublishedVideo(*result.Video) + } + } + + var videoIDs []string + if syncContext.VideoID == "" { + videoIDs, err = syncDB.GetUnpublishedIDs(videoSource.SourceName()) + if err != nil { + log.Errorf("Error getting unpublished videos from sync DB: %v", err) + return + } + } else { + videoIDs = []string{ syncContext.VideoID } + } + log.Debugf("Syncing videos: %v", videoIDs) + for _, videoID := range videoIDs { + err = syncVideo(syncContext, syncDB, videoSource, publisher, videoID) + if err != nil { + log.Errorf("Error syncing %s: %v", videoID, err) + return + } } log.Info("Done") } -func syncVideo(syncContext SyncContext, syncDB *SyncDb, videoSource VideoSource, publisher VideoPublisher, videoID string) error { +func cacheVideo(syncContext SyncContext, syncDB *SyncDb, videoSource VideoSource, videoID string) (*PublishableVideo, error) { + log.Debugf("Ensuring video %s:%s is cached", videoSource.SourceName(), videoID) + + videoRecord, err := syncDB.GetVideoRecord(videoSource.SourceName(), videoID, true, true) + if err != nil { + log.Errorf("Error checking if video is already cached: %v", err) + return nil, err + } + + if videoRecord != nil && videoRecord.FullLocalPath.Valid { + log.Debugf("%s:%s is already cached.", videoSource.SourceName(), videoID) + video := videoRecord.ToPublishableVideo() + if video == nil { + log.Warnf("%s:%s appears to be cached locally, but has missing data. Caching again.") + } + return video, nil + } + + log.Debugf("%s:%s is not cached locally. Caching now.", videoSource.SourceName(), videoID) sourceVideo, err := videoSource.GetVideo(videoID) if err != nil { log.Errorf("Error getting source video: %v", err) - return err - } - - err = syncDB.SaveVideoData(*sourceVideo) - if err != nil { - log.Errorf("Error saving video data: %v", err) - return err + return nil, err } processedVideo, err := processVideoForPublishing(*sourceVideo, syncContext.ChannelID) if err != nil { log.Errorf("Error processing source video for publishing: %v", err) + return nil, err + } + + err = syncDB.SavePublishableVideo(*processedVideo) + if err != nil { + log.Errorf("Error saving video data: %v", err) + return nil, err + } + + return processedVideo, nil +} + +func syncVideo(syncContext SyncContext, syncDB *SyncDb, videoSource VideoSource, publisher VideoPublisher, videoID string) error { + log.Debugf("Running sync for video %s:%s", videoSource.SourceName(), videoID) + + isSynced, claimID, err := syncDB.IsVideoPublished(videoSource.SourceName(), videoID) + if err != nil { + log.Errorf("Error checking if video is already synced: %v", err) + return err + } + + if isSynced { + log.Infof("Video %s:%s is already published as %s.", videoSource.SourceName(), videoID, claimID) + return nil + } + + processedVideo, err := cacheVideo(syncContext, syncDB, videoSource, videoID) + if err != nil { + log.Errorf("Error ensuring video is cached prior to publication: %v", err) return err } @@ -181,7 +249,7 @@ func syncVideo(syncContext SyncContext, syncDB *SyncDb, videoSource VideoSource, log.Errorf("Error publishing video: %v", err) return err } - err = syncDB.SaveVideoPublication(*processedVideo, claimID) + err = syncDB.SavePublishedVideo((*processedVideo).ToPublished(claimID)) if err != nil { // Sync DB is corrupted after getting here // and will allow double publication. @@ -202,6 +270,11 @@ func syncVideo(syncContext SyncContext, syncDB *SyncDb, videoSource VideoSource, if !syncContext.KeepCache { log.Infof("Deleting local files.") + err = syncDB.MarkVideoUncached(videoSource.SourceName(), videoID) + if err != nil { + log.Errorf("Error marking video %s:%s as uncached in syncDB", videoSource.SourceName(), videoID) + return err + } err = videoSource.DeleteLocalCache(videoID) if err != nil { log.Errorf("Error deleting local files for video %s: %v", videoID, err) @@ -222,7 +295,7 @@ type SourceVideo struct { Tags []string ReleaseTime *int64 ThumbnailURL *string - FullLocalPath string + FullLocalPath *string } type PublishableVideo struct { @@ -239,7 +312,59 @@ type PublishableVideo struct { FullLocalPath string } +func (v PublishableVideo) ToPublished(claimID string) PublishedVideo { + return PublishedVideo { + ClaimID: claimID, + NativeID: v.ID, + Source: v.Source, + ClaimName: v.ClaimName, + Title: v.Title, + Description: v.Description, + SourceURL: v.SourceURL, + Languages: v.Languages, + Tags: v.Tags, + ReleaseTime: v.ReleaseTime, + ThumbnailURL: v.ThumbnailURL, + FullLocalPath: v.FullLocalPath, + } +} + +type PublishedVideo struct { + ClaimID string + NativeID string + Source string + ClaimName string + Title string + Description string + SourceURL string + Languages []string + Tags []string + ReleaseTime int64 + ThumbnailURL string + FullLocalPath string +} + +func (v PublishedVideo) ToPublishable() PublishableVideo { + return PublishableVideo { + ID: v.NativeID, + Source: v.Source, + ClaimName: v.ClaimName, + Title: v.Title, + Description: v.Description, + SourceURL: v.SourceURL, + Languages: v.Languages, + Tags: v.Tags, + ReleaseTime: v.ReleaseTime, + ThumbnailURL: v.ThumbnailURL, + FullLocalPath: v.FullLocalPath, + } +} + func processVideoForPublishing(source SourceVideo, channelID string) (*PublishableVideo, error) { + if source.FullLocalPath == nil { + return nil, errors.New("Video is not cached locally") + } + tags, err := tags_manager.SanitizeTags(source.Tags, channelID) if err != nil { log.Errorf("Error sanitizing tags: %v", err) @@ -289,7 +414,7 @@ func processVideoForPublishing(source SourceVideo, channelID string) (*Publishab Tags: tags, ReleaseTime: *releaseTime, ThumbnailURL: *thumbnailURL, - FullLocalPath: source.FullLocalPath, + FullLocalPath: *source.FullLocalPath, } log.Debugf("Video prepared for publication: %v", processed) @@ -313,10 +438,23 @@ func getAbbrevDescription(v SourceVideo) string { } type VideoSource interface { + SourceName() string GetVideo(id string) (*SourceVideo, error) DeleteLocalCache(id string) error + Scan(sinceTimestamp int64) <-chan SourceScanIteratorResult +} + +type SourceScanIteratorResult struct { + Video *SourceVideo + Error error } type VideoPublisher interface { - Publish(video PublishableVideo, reflectStream bool) (string, chan error, error) + Publish(video PublishableVideo, reflectStream bool) (string, <-chan error, error) + PublishedVideoIterator(sinceTimestamp int64) <-chan PublishedVideoIteratorResult +} + +type PublishedVideoIteratorResult struct { + Video *PublishedVideo + Error error } diff --git a/local/localSDKPublisher.go b/local/localSDKPublisher.go index c27a721..24c0124 100644 --- a/local/localSDKPublisher.go +++ b/local/localSDKPublisher.go @@ -48,7 +48,7 @@ func NewLocalSDKPublisher(sdkAddr, channelID string, publishBid float64) (*Local return &publisher, nil } -func (p *LocalSDKPublisher) Publish(video PublishableVideo, reflectStream bool) (string, chan error, error) { +func (p *LocalSDKPublisher) Publish(video PublishableVideo, reflectStream bool) (string, <-chan error, error) { streamCreateOptions := jsonrpc.StreamCreateOptions { ClaimCreateOptions: jsonrpc.ClaimCreateOptions { Title: &video.Title, @@ -116,6 +116,60 @@ func (p *LocalSDKPublisher) Publish(video PublishableVideo, reflectStream bool) return *claimID, done, nil } +func (p *LocalSDKPublisher) PublishedVideoIterator(sinceTimestamp int64) <-chan PublishedVideoIteratorResult { + videoCh := make(chan PublishedVideoIteratorResult, 10) + go func() { + defer close(videoCh) + for page := uint64(0); ; page++ { + streams, err := p.lbrynet.StreamList(nil, page, 100) + if err != nil { + log.Errorf("Error listing streams (page %d): %v", page, err) + + errResult := PublishedVideoIteratorResult { + Error: err, + } + videoCh <- errResult + return + } + if len(streams.Items) == 0 { + return + } + + for _, stream := range streams.Items { + if stream.ChannelID != p.channelID || stream.Value.GetStream().ReleaseTime < sinceTimestamp { + continue + } + + languages := []string{} + for _, language := range stream.Value.Languages { + languages = append(languages, language.String()) + } + + video := PublishedVideo { + ClaimID: stream.ClaimID, + NativeID: "", + Source: "", + ClaimName: stream.Name, + Title: stream.Value.Title, + Description: stream.Value.Description, + Languages: languages, + Tags: stream.Value.Tags, + ReleaseTime: stream.Value.GetStream().ReleaseTime, + ThumbnailURL: stream.Value.Thumbnail.Url, + FullLocalPath: "", + } + + videoResult := PublishedVideoIteratorResult { + Video: &video, + } + videoCh <- videoResult + } + } + }() + + return videoCh +} + // if jsonrpc.Client.FileList is extended to match the actual jsonrpc schema, this can be removed func findFileByTxid(client *jsonrpc.Client, txid string) (*jsonrpc.FileListResponse, int, error) { response, err := client.FileList(0, 20) diff --git a/local/syncDb.go b/local/syncDb.go index 34b3eb0..c3a4fb9 100644 --- a/local/syncDb.go +++ b/local/syncDb.go @@ -10,6 +10,14 @@ type SyncDb struct { db *sql.DB } +type SyncDbSummary struct { + Total int + CachedUnpublished int + UncachedUnpublished int + LatestKnown int64 + LatestPublished int64 +} + func NewSyncDb(path string) (*SyncDb, error) { db, err := sql.Open("sqlite3", path) if err != nil { @@ -33,7 +41,157 @@ func (c *SyncDb) Close() error { return c.db.Close() } -func (c *SyncDb) SaveVideoData(video SourceVideo) error { +func (c *SyncDb) SaveKnownVideo(video SourceVideo) error { + if video.ID == "" { + log.Warnf("Trying to save a video with no ID: %v", video) + } + insertSql := ` +INSERT INTO videos ( + source, + native_id, + title, + description, + source_url, + release_time, + thumbnail_url, + full_local_path +) VALUES (?, ?, ?, ?, ?, ?, ?, ?) +ON CONFLICT (source, native_id) +DO NOTHING; + ` + + r := SyncRecordFromSourceVideo(video) + + _, err := c.db.Exec( + insertSql, + r.Source, + r.NativeID, + r.Title, + r.Description, + r.SourceURL, + r.ReleaseTime, + r.ThumbnailURL, + r.FullLocalPath, + ) + return err +} + +func (c *SyncDb) SavePublishableVideo(video PublishableVideo) error { + upsertSql := ` +INSERT INTO videos ( + source, + native_id, + claim_name, + title, + description, + source_url, + release_time, + thumbnail_url, + full_local_path +) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) +ON CONFLICT (source, native_id) +DO UPDATE SET + claim_name = excluded.claim_name, + title = excluded.title, + description = excluded.description, + source_url = excluded.source_url, + release_time = excluded.release_time, + thumbnail_url = excluded.thumbnail_url, + full_local_path = excluded.full_local_path; + ` + _, err := c.db.Exec( + upsertSql, + video.Source, + video.ID, + video.ClaimName, + video.Title, + video.Description, + video.SourceURL, + video.ReleaseTime, + video.ThumbnailURL, + video.FullLocalPath, + ) + if err != nil { + return err + } + + + err = c.upsertTags(video.Source, video.ID, video.Tags) + if err != nil { + return err + } + + err = c.upsertLanguages(video.Source, video.ID, video.Languages) + return err +} + +func (c *SyncDb) SavePublishedVideo(video PublishedVideo) error { + upsertSql := ` +INSERT INTO videos ( + source, + native_id, + claim_id, + claim_name, + title, + description, + source_url, + release_time, + thumbnail_url, + full_local_path +) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +ON CONFLICT (source, native_id) +DO UPDATE SET + claim_id = excluded.claim_id, + claim_name = excluded.claim_name, + title = excluded.title, + description = excluded.description, + source_url = excluded.source_url, + release_time = excluded.release_time, + thumbnail_url = excluded.thumbnail_url, + full_local_path = excluded.full_local_path; + ` + _, err := c.db.Exec( + upsertSql, + video.Source, + video.NativeID, + video.ClaimID, + video.ClaimName, + video.Title, + video.Description, + video.SourceURL, + video.ReleaseTime, + video.ThumbnailURL, + video.FullLocalPath, + ) + if err != nil { + return err + } + + + err = c.upsertTags(video.Source, video.NativeID, video.Tags) + if err != nil { + return err + } + + err = c.upsertLanguages(video.Source, video.NativeID, video.Languages) + return err +} + +func (c *SyncDb) MarkVideoUncached(source, id string) error { + updateSql := ` +UPDATE videos +SET full_local_path = NULL +WHERE source = ? AND native_id = ?; + ` + _, err := c.db.Exec( + updateSql, + source, + id, + ) + return err +} + +func (c *SyncDb) _SaveVideoData(video SourceVideo) error { upsertSql := ` INSERT INTO videos ( source, @@ -152,7 +310,33 @@ WHERE source = ? AND native_id = ? } } -func (c *SyncDb) GetSavedVideoData(source, id string) (*SourceVideo, *string, error) { +func (c *SyncDb) IsVideoCached(source, id string) (bool, string, error) { + selectSql := ` +SELECT + full_local_path +FROM videos +WHERE source = ? AND native_id = ? + ` + row := c.db.QueryRow(selectSql, source, id) + + var localPath sql.NullString + err := row.Scan(&localPath) + + if err == sql.ErrNoRows { + return false, "", nil + } else if err != nil { + log.Errorf("Error querying video cache status for %s:%s from sync DB: %v", source, id, err) + return false, "", err + } + + if localPath.Valid { + return true, localPath.String, nil + } else { + return false, "", nil + } +} + +func (c *SyncDb) GetVideoRecord(source, id string, includeTags, includeLanguages bool) (*SyncRecord, error) { selectSql := ` SELECT native_id, @@ -168,40 +352,107 @@ WHERE source = ? AND native_id = ? ` row := c.db.QueryRow(selectSql, source, id) - var record syncRecord + var record SyncRecord err := row.Scan( - &record.nativeID, - &record.title, - &record.description, - &record.sourceURL, - &record.releaseTime, - &record.thumbnailURL, - &record.fullLocalPath, - &record.claimID, + &record.NativeID, + &record.Title, + &record.Description, + &record.SourceURL, + &record.ReleaseTime, + &record.ThumbnailURL, + &record.FullLocalPath, + &record.ClaimID, ) if err == sql.ErrNoRows { - log.Debugf("Data for YouTube:%s is not in the sync DB", id) - return nil, nil, nil + log.Debugf("Data for %s:%s is not in the sync DB", source, id) + return nil, nil } else if err != nil { log.Errorf("Error querying video data for %s:%s from sync DB: %v", source, id, err) - return nil, nil, err + return nil, err } - sourceVideo, claimID := record.toSourceVideo() - tags, err := c.getTags(source, id) + if includeTags { + tags, err := c.getTags(source, id) + if err != nil { + return nil, err + } + record.Tags = &tags + } + + if includeLanguages { + languages, err := c.getLanguages(source, id) + if err != nil { + return nil, err + } + record.Languages = &languages + } + + return &record, nil +} + +func (c *SyncDb) GetUnpublishedIDs(source string) ([]string, error) { + selectSql := ` +SELECT + native_id +FROM videos +WHERE source = ? AND claim_id IS NULL + ` + ids := []string{} + + rows, err := c.db.Query(selectSql, source) if err != nil { - return nil, nil, err + return ids, err + } + defer rows.Close() + + for rows.Next() { + var id string + err = rows.Scan(&id) + if err != nil { + return ids, err + } + ids = append(ids, id) } - languages, err := c.getLanguages(source, id) + return ids, nil +} + +func (c *SyncDb) GetSummary() (*SyncDbSummary, error) { + selectSql := ` +SELECT + COUNT() AS total, + COUNT(v_unpub.full_local_path) AS cached_unpublished, + COUNT(v_all.claim_id) - COUNT(v_unpub.full_local_path) AS uncached_unpublished, + MAX(v_all.release_time) AS latest_known, + MAX(v_pub.release_time) AS latest_published +FROM videos v_all +LEFT JOIN videos v_pub ON v_all.source = v_pub.source AND v_all.native_id = v_pub.native_id AND v_pub.claim_id IS NOT NULL +LEFT JOIN videos v_unpub ON v_all.source = v_unpub.source AND v_all.native_id = v_unpub.native_id AND v_unpub.claim_id IS NULL + ` + row := c.db.QueryRow(selectSql) + + var summary SyncDbSummary + var latestKnown, latestPublished sql.NullInt64 + err := row.Scan( + &summary.Total, + &summary.CachedUnpublished, + &summary.UncachedUnpublished, + &latestKnown, + &latestPublished, + ) if err != nil { - return nil, nil, err + log.Errorf("Error querying sync DB summary: %v", err) + return nil, err } - sourceVideo.Tags = tags - sourceVideo.Languages = languages + if latestKnown.Valid { + summary.LatestKnown = latestKnown.Int64 + } + if latestPublished.Valid { + summary.LatestPublished = latestPublished.Int64 + } - return &sourceVideo, claimID, nil + return &summary, nil } func (c *SyncDb) ensureSchema() error { @@ -215,6 +466,7 @@ CREATE TABLE IF NOT EXISTS videos ( release_time INT, thumbnail_url TEXT, full_local_path TEXT, + claim_name TEXT, claim_id TEXT, PRIMARY KEY (source, native_id) ); @@ -343,53 +595,82 @@ WHERE source = ? AND native_id = ?; return languages, nil } -type syncRecord struct { - source string - nativeID string - title sql.NullString - description sql.NullString - sourceURL string - releaseTime sql.NullInt64 - thumbnailURL sql.NullString - fullLocalPath string - claimID sql.NullString +type SyncRecord struct { + Source string + NativeID string + Title sql.NullString + Description sql.NullString + SourceURL sql.NullString + ReleaseTime sql.NullInt64 + ThumbnailURL sql.NullString + FullLocalPath sql.NullString + ClaimID sql.NullString + Tags *[]string + Languages *[]string } -func (r *syncRecord) toSourceVideo() (SourceVideo, *string) { - video := SourceVideo { - ID: r.nativeID, - Source: r.source, - SourceURL: r.sourceURL, - FullLocalPath: r.fullLocalPath, +func SyncRecordFromSourceVideo(v SourceVideo) SyncRecord { + r := SyncRecord { + Source: v.Source, + NativeID: v.ID, + SourceURL: sql.NullString { String: v.SourceURL, Valid: true }, } - if r.title.Valid { - video.Title = &r.title.String - } else { - video.Title = nil + if v.Title != nil { + r.Title = sql.NullString { String: *v.Title, Valid: true } } - if r.description.Valid { - video.Description = &r.description.String - } else { - video.Description = nil + if v.Description != nil { + r.Description = sql.NullString { String: *v.Description, Valid: true } } - if r.releaseTime.Valid { - video.ReleaseTime = &r.releaseTime.Int64 - } else { - video.ReleaseTime = nil + if v.ThumbnailURL != nil { + r.ThumbnailURL = sql.NullString { String: *v.ThumbnailURL, Valid: true } } - if r.thumbnailURL.Valid { - video.ThumbnailURL = &r.thumbnailURL.String - } else { - video.ThumbnailURL = nil + if v.FullLocalPath != nil { + r.FullLocalPath = sql.NullString { String: *v.FullLocalPath, Valid: true } } - if r.claimID.Valid { - return video, &r.claimID.String - } else { - return video, nil + if v.ReleaseTime != nil { + r.ReleaseTime = sql.NullInt64 { Int64: *v.ReleaseTime, Valid: true } } + + if len(v.Tags) > 0 { + r.Tags = &v.Tags + } + + if len(v.Languages) > 0 { + r.Languages = &v.Languages + } + + return r +} + +func (r *SyncRecord) ToPublishableVideo() *PublishableVideo { + if !(r.Title.Valid && + r.Description.Valid && + r.SourceURL.Valid && + r.ReleaseTime.Valid && + r.ThumbnailURL.Valid && + r.FullLocalPath.Valid && + r.Tags != nil && + r.Languages != nil) { + + return nil + } + + video := PublishableVideo { + ID: r.NativeID, + Source: r.Source, + Description: r.Description.String, + SourceURL: r.SourceURL.String, + ReleaseTime: r.ReleaseTime.Int64, + ThumbnailURL: r.ThumbnailURL.String, + FullLocalPath: r.FullLocalPath.String, + Tags: *r.Tags, + Languages: *r.Languages, + } + + return &video } diff --git a/local/youtubeChannelScanner.go b/local/youtubeChannelScanner.go new file mode 100644 index 0000000..7558c4a --- /dev/null +++ b/local/youtubeChannelScanner.go @@ -0,0 +1,51 @@ +package local + +type YouTubeChannelScanner interface { + Scan(sinceTimestamp int64) <-chan SourceScanIteratorResult +} + +type YouTubeAPIChannelScanner struct { + api *YouTubeAPI + channel string +} + +func NewYouTubeAPIChannelScanner(apiKey, channel string) (*YouTubeAPIChannelScanner) { + scanner := YouTubeAPIChannelScanner { + api: NewYouTubeAPI(apiKey), + channel: channel, + } + return &scanner +} + +func (s *YouTubeAPIChannelScanner) Scan(sinceTimestamp int64) <-chan SourceScanIteratorResult { + videoCh := make(chan SourceScanIteratorResult, 10) + go func() { + defer close(videoCh) + + for firstRun, nextPage := true, ""; firstRun || nextPage != ""; { + var videos []SourceVideo + var err error + + firstRun = false + + videos, nextPage, err = s.api.GetChannelVideosPage(s.channel, sinceTimestamp, nextPage) + if err != nil { + videoCh <- SourceScanIteratorResult { + Video: nil, + Error: err, + } + return + } + + for _, video := range videos { + outVideo := video + videoCh <- SourceScanIteratorResult { + Video: &outVideo, + Error: nil, + } + } + } + }() + + return videoCh +} diff --git a/local/youtubeEnricher.go b/local/youtubeEnricher.go index 38369b5..0c36d7b 100644 --- a/local/youtubeEnricher.go +++ b/local/youtubeEnricher.go @@ -43,3 +43,32 @@ func (e *YouTubeAPIVideoEnricher) EnrichMissing(source *SourceVideo) error { } return nil } + +type CacheVideoEnricher struct { + syncDB *SyncDb +} + +func NewCacheVideoEnricher(syncDB *SyncDb) *CacheVideoEnricher { + enricher := CacheVideoEnricher { + syncDB, + } + return &enricher +} + +func (e *CacheVideoEnricher) EnrichMissing(source *SourceVideo) error { + if source.ReleaseTime != nil { + log.Debugf("Video %s does not need enrichment. YouTubeAPIVideoEnricher is skipping.", source.ID) + return nil + } + + cached, err := e.syncDB.GetVideoRecord(source.Source, source.ID, false, false) + if err != nil { + log.Errorf("Error getting cached video %s: %v", source.ID, err) + return err + } + + if cached != nil && cached.ReleaseTime.Valid { + source.ReleaseTime = &cached.ReleaseTime.Int64 + } + return nil +} diff --git a/local/ytapi.go b/local/ytapi.go index 1924446..1427901 100644 --- a/local/ytapi.go +++ b/local/ytapi.go @@ -8,6 +8,8 @@ import ( "time" log "github.com/sirupsen/logrus" + + "github.com/lbryio/lbry.go/v2/extras/util" ) type YouTubeAPI struct { @@ -72,8 +74,80 @@ func (a *YouTubeAPI) GetVideoSnippet(videoID string) (*VideoSnippet, error) { return &result.Items[0].Snippet, nil } +func (a *YouTubeAPI) GetChannelVideosPage(channelID string, publishedAfter int64, pageToken string) ([]SourceVideo, string, error) { + req, err := http.NewRequest("GET", "https://youtube.googleapis.com/youtube/v3/search", nil) + if err != nil { + log.Errorf("Error creating http client for YouTube API: %v", err) + return []SourceVideo{}, "", err + } + + query := req.URL.Query() + query.Add("part", "snippet") + query.Add("type", "video") + query.Add("channelId", channelID) + query.Add("publishedAfter", time.Unix(publishedAfter, 0).Format(time.RFC3339)) + query.Add("maxResults", "5") + if pageToken != "" { + query.Add("pageToken", pageToken) + } + query.Add("key", a.apiKey) + req.URL.RawQuery = query.Encode() + + req.Header.Add("Accept", "application/json") + + resp, err := a.client.Do(req) + defer resp.Body.Close() + if err != nil { + log.Errorf("Error from YouTube API: %v", err) + return []SourceVideo{}, "", err + } + + body, err := io.ReadAll(resp.Body) + log.Tracef("Response from YouTube API: %s", string(body[:])) + + var result videoSearchResponse + err = json.Unmarshal(body, &result) + if err != nil { + log.Errorf("Error deserializing video list response from YouTube API: %v", err) + return []SourceVideo{}, "", err + } + + videos := []SourceVideo{} + for _, item := range result.Items { + var releaseTime *int64 + publishedAt, err := time.Parse(time.RFC3339, item.Snippet.PublishedAt) + if err != nil { + log.Errorf("Unable to parse publish time of %s while scanning YouTube channel %s: %v", item.ID.VideoID, channelID) + releaseTime = nil + } else { + releaseTime = util.PtrToInt64(publishedAt.Unix()) + } + + video := SourceVideo { + ID: item.ID.VideoID, + Source: "YouTube", + ReleaseTime: releaseTime, + } + videos = append(videos, video) + } + + return videos, result.NextPageToken, nil +} + type videoListResponse struct { + NextPageToken string `json:"nextPageToken"` Items []struct { + ID string `json:"id"` + Snippet VideoSnippet `json:"snippet"` + } `json:"items"` +} + +type videoSearchResponse struct { + NextPageToken string `json:"nextPageToken"` + Items []struct { + ID struct{ + VideoID string `json:"videoId"` + } `json:"id"` Snippet VideoSnippet `json:"snippet"` } `json:"items"` } diff --git a/local/ytdlVideoSource.go b/local/ytdlVideoSource.go index e82a2a7..2fd2a56 100644 --- a/local/ytdlVideoSource.go +++ b/local/ytdlVideoSource.go @@ -8,10 +8,11 @@ import ( type YtdlVideoSource struct { downloader Ytdl + channelScanner YouTubeChannelScanner enrichers []YouTubeVideoEnricher } -func NewYtdlVideoSource(downloadDir string, config *YouTubeSourceConfig) (*YtdlVideoSource, error) { +func NewYtdlVideoSource(downloadDir string, config *YouTubeSourceConfig, syncDB *SyncDb) (*YtdlVideoSource, error) { ytdl, err := NewYtdl(downloadDir) if err != nil { return nil, err @@ -21,14 +22,27 @@ func NewYtdlVideoSource(downloadDir string, config *YouTubeSourceConfig) (*YtdlV downloader: *ytdl, } + if syncDB != nil { + source.enrichers = append(source.enrichers, NewCacheVideoEnricher(syncDB)) + } + if config.APIKey != "" { ytapiEnricher := NewYouTubeAPIVideoEnricher(config.APIKey) source.enrichers = append(source.enrichers, ytapiEnricher) + source.channelScanner = NewYouTubeAPIChannelScanner(config.APIKey, config.ChannelID) + } + + if source.channelScanner == nil { + log.Warnf("No means of scanning source channels has been provided") } return &source, nil } +func (s *YtdlVideoSource) SourceName() string { + return "YouTube" +} + func (s *YtdlVideoSource) GetVideo(id string) (*SourceVideo, error) { metadata, err := s.downloader.GetVideoMetadata(id) if err != nil { @@ -57,7 +71,7 @@ func (s *YtdlVideoSource) GetVideo(id string) (*SourceVideo, error) { Tags: metadata.Tags, ReleaseTime: nil, ThumbnailURL: &bestThumbnail.URL, - FullLocalPath: videoPath, + FullLocalPath: &videoPath, } for _, enricher := range s.enrichers { @@ -75,3 +89,13 @@ func (s *YtdlVideoSource) GetVideo(id string) (*SourceVideo, error) { func (s *YtdlVideoSource) DeleteLocalCache(id string) error { return s.downloader.DeleteVideoFiles(id) } + +func (s *YtdlVideoSource) Scan(sinceTimestamp int64) <-chan SourceScanIteratorResult { + if s.channelScanner != nil { + return s.channelScanner.Scan(sinceTimestamp) + } + + videoCh := make(chan SourceScanIteratorResult, 1) + close(videoCh) + return videoCh +} -- 2.45.2