diff --git a/local/local.go b/local/local.go index cacc57a..9b7837e 100644 --- a/local/local.go +++ b/local/local.go @@ -1,22 +1,253 @@ package local import ( - "fmt" + "errors" + "os" + "regexp" + "strings" + "time" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "github.com/abadojack/whatlanggo" + + "github.com/lbryio/lbry.go/v2/extras/util" + "github.com/lbryio/ytsync/v5/namer" + "github.com/lbryio/ytsync/v5/tags_manager" ) +type SyncContext struct { + DryRun bool + KeepCache bool + ReflectStreams bool + TempDir string + LbrynetAddr string + ChannelID string + PublishBid float64 + YouTubeSourceConfig *YouTubeSourceConfig +} + +func (c *SyncContext) Validate() error { + if c.TempDir == "" { + return errors.New("No TempDir provided") + } + if c.LbrynetAddr == "" { + return errors.New("No Lbrynet address provided") + } + if c.ChannelID == "" { + return errors.New("No channel ID provided") + } + if c.PublishBid <= 0.0 { + return errors.New("Publish bid is not greater than zero") + } + return nil +} + +type YouTubeSourceConfig struct { + YouTubeAPIKey string +} + +var syncContext SyncContext + func AddCommand(rootCmd *cobra.Command) { cmd := &cobra.Command{ Use: "local", Short: "run a personal ytsync", Run: localCmd, + Args: cobra.ExactArgs(1), } - //cmd.Flags().StringVar(&cache, "cache", "", "path to cache") - rootCmd.AddCommand(cmd) + cmd.Flags().BoolVar(&syncContext.DryRun, "dry-run", false, "Display information about the stream publishing, but do not publish the stream") + cmd.Flags().BoolVar(&syncContext.KeepCache, "keep-cache", false, "Don't delete local files after publishing.") + cmd.Flags().BoolVar(&syncContext.ReflectStreams, "reflect-streams", true, "Require published streams to be reflected.") + cmd.Flags().StringVar(&syncContext.TempDir, "temp-dir", getEnvDefault("TEMP_DIR", ""), "directory to use for temporary files") + cmd.Flags().Float64Var(&syncContext.PublishBid, "publish-bid", 0.01, "Bid amount for the stream claim") + cmd.Flags().StringVar(&syncContext.LbrynetAddr, "lbrynet-address", getEnvDefault("LBRYNET_ADDRESS", ""), "JSONRPC address of the local LBRYNet daemon") + cmd.Flags().StringVar(&syncContext.ChannelID, "channel-id", "", "LBRY channel ID to publish to") + // For now, assume source is always YouTube + syncContext.YouTubeSourceConfig = &YouTubeSourceConfig{} + cmd.Flags().StringVar(&syncContext.YouTubeSourceConfig.YouTubeAPIKey, "youtube-api-key", getEnvDefault("YOUTUBE_API_KEY", ""), "YouTube API Key") + rootCmd.AddCommand(cmd) +} + +func getEnvDefault(key, defaultValue string) string { + if value, ok := os.LookupEnv(key); ok { + return value + } + return defaultValue } func localCmd(cmd *cobra.Command, args []string) { - fmt.Println("local") + err := syncContext.Validate() + if err != nil { + log.Error(err) + return + } + videoID := args[0] + + log.Debugf("Running sync for video ID %s", videoID) + + var publisher VideoPublisher + publisher, err = NewLocalSDKPublisher(syncContext.LbrynetAddr, syncContext.ChannelID, syncContext.PublishBid) + if err != nil { + log.Errorf("Error setting up publisher: %v", err) + return + } + + var videoSource VideoSource + if syncContext.YouTubeSourceConfig != nil { + videoSource, err = NewYtdlVideoSource(syncContext.TempDir, syncContext.YouTubeSourceConfig) + if err != nil { + log.Errorf("Error setting up video source: %v", err) + return + } + } + + sourceVideo, err := videoSource.GetVideo(videoID) + if err != nil { + log.Errorf("Error getting source video: %v", err) + return + } + + processedVideo, err := processVideoForPublishing(*sourceVideo, syncContext.ChannelID) + if err != nil { + log.Errorf("Error processing source video for publishing: %v", err) + return + } + + if syncContext.DryRun { + log.Infoln("This is a dry run. Nothing will be published.") + log.Infof("The local file %s would be published to channel ID %s as %s.", processedVideo.FullLocalPath, syncContext.ChannelID, processedVideo.ClaimName) + log.Debugf("Object to be published: %v", processedVideo) + + } else { + doneReflectingCh, err := publisher.Publish(*processedVideo, syncContext.ReflectStreams) + if err != nil { + log.Errorf("Error publishing video: %v", err) + return + } + + if syncContext.ReflectStreams { + err = <-doneReflectingCh + if err != nil { + log.Errorf("Error while wating for stream to reflect: %v", err) + } + } else { + log.Debugln("Not waiting for stream to reflect.") + } + } + + if !syncContext.KeepCache { + log.Infof("Deleting local files.") + err = videoSource.DeleteLocalCache(videoID) + if err != nil { + log.Errorf("Error deleting local files for video %s: %v", videoID, err) + } + } + log.Info("Done") +} + +type SourceVideo struct { + ID string + Title *string + Description *string + SourceURL string + Languages []string + Tags []string + ReleaseTime *int64 + ThumbnailURL *string + FullLocalPath string +} + +type PublishableVideo struct { + ID string + ClaimName string + Title string + Description string + SourceURL string + Languages []string + Tags []string + ReleaseTime int64 + ThumbnailURL string + FullLocalPath string +} + +func processVideoForPublishing(source SourceVideo, channelID string) (*PublishableVideo, error) { + tags, err := tags_manager.SanitizeTags(source.Tags, channelID) + if err != nil { + log.Errorf("Error sanitizing tags: %v", err) + return nil, err + } + + descriptionSample := "" + if source.Description != nil { + urlsRegex := regexp.MustCompile(`(?m) ?(f|ht)(tp)(s?)(://)(.*)[.|/](.*)`) + descriptionSample = urlsRegex.ReplaceAllString(*source.Description, "") + } + info := whatlanggo.Detect(descriptionSample) + + title := "" + if source.Title != nil { + title = *source.Title + } + info2 := whatlanggo.Detect(title) + var languages []string = nil + if info.IsReliable() && info.Lang.Iso6391() != "" { + language := info.Lang.Iso6391() + languages = []string{language} + } else if info2.IsReliable() && info2.Lang.Iso6391() != "" { + language := info2.Lang.Iso6391() + languages = []string{language} + } + + claimName := namer.NewNamer().GetNextName(title) + + thumbnailURL := source.ThumbnailURL + if thumbnailURL == nil { + thumbnailURL = util.PtrToString("") + } + + releaseTime := source.ReleaseTime + if releaseTime == nil { + releaseTime = util.PtrToInt64(time.Now().Unix()) + } + + processed := PublishableVideo { + ClaimName: claimName, + Title: title, + Description: getAbbrevDescription(source), + Languages: languages, + Tags: tags, + ReleaseTime: *releaseTime, + ThumbnailURL: *thumbnailURL, + FullLocalPath: source.FullLocalPath, + } + + log.Debugf("Video prepared for publication: %v", processed) + + return &processed, nil +} + +func getAbbrevDescription(v SourceVideo) string { + if v.Description == nil { + return v.SourceURL + } + + additionalDescription := "\n...\n" + v.SourceURL + maxLength := 2800 - len(additionalDescription) + + description := strings.TrimSpace(*v.Description) + if len(description) > maxLength { + description = description[:maxLength] + } + return description + additionalDescription +} + +type VideoSource interface { + GetVideo(id string) (*SourceVideo, error) + DeleteLocalCache(id string) error +} + +type VideoPublisher interface { + Publish(video PublishableVideo, reflectStream bool) (chan error, error) } diff --git a/local/localSDKPublisher.go b/local/localSDKPublisher.go new file mode 100644 index 0000000..3886eb9 --- /dev/null +++ b/local/localSDKPublisher.go @@ -0,0 +1,125 @@ +package local + +import ( + "errors" + "sort" + "time" + + log "github.com/sirupsen/logrus" + + "github.com/lbryio/lbry.go/v2/extras/jsonrpc" + "github.com/lbryio/lbry.go/v2/extras/util" +) + +type LocalSDKPublisher struct { + channelID string + publishBid float64 + lbrynet *jsonrpc.Client +} + +func NewLocalSDKPublisher(sdkAddr, channelID string, publishBid float64) (*LocalSDKPublisher, error) { + lbrynet := jsonrpc.NewClient(sdkAddr) + lbrynet.SetRPCTimeout(5 * time.Minute) + + status, err := lbrynet.Status() + if err != nil { + return nil, err + } + + if !status.IsRunning { + return nil, errors.New("SDK is not running") + } + + // Should check to see if the SDK owns the channel + + // Should check to see if wallet is unlocked + // but jsonrpc.Client doesn't have WalletStatus method + // so skip for now + + // Should check to see if streams are configured to be reflected and warn if not + // but jsonrpc.Client doesn't have SettingsGet method to see if streams are reflected + // so use File.UploadingToReflector as a proxy for now + + publisher := LocalSDKPublisher { + channelID: channelID, + publishBid: publishBid, + lbrynet: lbrynet, + } + return &publisher, nil +} + +func (p *LocalSDKPublisher) Publish(video PublishableVideo, reflectStream bool) (chan error, error) { + streamCreateOptions := jsonrpc.StreamCreateOptions { + ClaimCreateOptions: jsonrpc.ClaimCreateOptions { + Title: &video.Title, + Description: &video.Description, + Languages: video.Languages, + ThumbnailURL: &video.ThumbnailURL, + Tags: video.Tags, + }, + ReleaseTime: &video.ReleaseTime, + ChannelID: &p.channelID, + License: util.PtrToString("Copyrighted (contact publisher)"), + } + + txSummary, err := p.lbrynet.StreamCreate(video.ClaimName, video.FullLocalPath, p.publishBid, streamCreateOptions) + if err != nil { + return nil, err + } + + if !reflectStream { + return nil, nil + } + + done := make(chan error, 1) + go func() { + for { + fileListResponse, fileIndex, err := findFileByTxid(p.lbrynet, txSummary.Txid) + if err != nil { + log.Errorf("Error finding file by txid: %v", err) + done <- err + return + } + if fileListResponse == nil { + log.Errorf("Could not find file in list with correct txid") + done <- err + return + } + + fileStatus := fileListResponse.Items[fileIndex] + if fileStatus.IsFullyReflected { + log.Info("Stream is fully reflected") + break + } + if !fileStatus.UploadingToReflector { + log.Error("Stream is not being uploaded to a reflector. Check your lbrynet settings if this is a mistake.") + done <- errors.New("Stream is not being reflected (check lbrynet settings).") + return + } + log.Infof("Stream reflector progress: %d%%", fileStatus.ReflectorProgress) + time.Sleep(5 * time.Second) + } + done <- nil + }() + + return done, nil +} + +// if jsonrpc.Client.FileList is extended to match the actual jsonrpc schema, this can be removed +func findFileByTxid(client *jsonrpc.Client, txid string) (*jsonrpc.FileListResponse, int, error) { + response, err := client.FileList(0, 20) + for { + if err != nil { + log.Errorf("Error getting file list page: %v", err) + return nil, 0, err + } + index := sort.Search(len(response.Items), func (i int) bool { return response.Items[i].Txid == txid }) + if index < len(response.Items) { + return response, index, nil + } + if response.Page >= response.TotalPages { + return nil, 0, nil + } + response, err = client.FileList(response.Page + 1, 20) + } +} diff --git a/local/readme.md b/local/readme.md index 16efe8b..81b0235 100644 --- a/local/readme.md +++ b/local/readme.md @@ -5,6 +5,7 @@ - LBRY SDK (what do we actually need this for?) - youtube-dl - enough space to cache stuff +- YouTube data API key ## Process @@ -15,6 +16,21 @@ - or easier, just error if no channel - enough lbc in wallet? +### Getting a YouTube API key + +To access the YouTube data API, you will first need some kind of google account. + +The API has two methods of authentication, OAuth2 and API keys. This application uses API keys. +These API keys are basically like passwords, and so once obtained, they should not be shared. + +The instructions for obtaining an API key are copied below from [here](https://developers.google.com/youtube/registering_an_application): + + +1. Open the [Credentials page](https://console.developers.google.com/apis/credentials) in the API Console. +2. Create an API key in the Console by clicking **Create credentials > API key**. You can restrict the key before using it in production by clicking **Restrict key** and selecting one of the **Restrictions**. + +To keep your API keys secure, follow the [best practices for securely using API keys](https://cloud.google.com/docs/authentication/api-keys). + ### Options to figure out what's already synced - simplest: assume nothing is synced yet @@ -50,4 +66,4 @@ ### Debugging -- dry-running the whole thing \ No newline at end of file +- dry-running the whole thing diff --git a/local/youtubeEnricher.go b/local/youtubeEnricher.go new file mode 100644 index 0000000..38369b5 --- /dev/null +++ b/local/youtubeEnricher.go @@ -0,0 +1,45 @@ +package local + +import ( + "time" + + log "github.com/sirupsen/logrus" + + "github.com/lbryio/lbry.go/v2/extras/util" +) + +type YouTubeVideoEnricher interface { + EnrichMissing(source *SourceVideo) error +} + +type YouTubeAPIVideoEnricher struct { + api *YouTubeAPI +} + +func NewYouTubeAPIVideoEnricher(apiKey string) (*YouTubeAPIVideoEnricher) { + enricher := YouTubeAPIVideoEnricher{ + api: NewYouTubeAPI(apiKey), + } + return &enricher +} + +func (e *YouTubeAPIVideoEnricher) EnrichMissing(source *SourceVideo) error { + if source.ReleaseTime != nil { + log.Debugf("Video %s does not need enrichment. YouTubeAPIVideoEnricher is skipping.", source.ID) + return nil + } + + snippet, err := e.api.GetVideoSnippet(source.ID) + if err != nil { + log.Errorf("Error snippet data for video %s: %v", err) + return err + } + + publishedAt, err := time.Parse(time.RFC3339, snippet.PublishedAt) + if err != nil { + log.Errorf("Error converting publishedAt to timestamp: %v", err) + } else { + source.ReleaseTime = util.PtrToInt64(publishedAt.Unix()) + } + return nil +} diff --git a/local/ytapi.go b/local/ytapi.go new file mode 100644 index 0000000..1924446 --- /dev/null +++ b/local/ytapi.go @@ -0,0 +1,83 @@ +package local + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + log "github.com/sirupsen/logrus" +) + +type YouTubeAPI struct { + apiKey string + client *http.Client +} + +func NewYouTubeAPI(apiKey string) (*YouTubeAPI) { + client := &http.Client { + Transport: &http.Transport{ + MaxIdleConns: 10, + IdleConnTimeout: 30 * time.Second, + DisableCompression: true, + }, + } + + api := YouTubeAPI { + apiKey: apiKey, + client: client, + } + + return &api +} + +func (a *YouTubeAPI) GetVideoSnippet(videoID string) (*VideoSnippet, error) { + req, err := http.NewRequest("GET", "https://youtube.googleapis.com/youtube/v3/videos", nil) + if err != nil { + log.Errorf("Error creating http client for YouTube API: %v", err) + return nil, err + } + + query := req.URL.Query() + query.Add("part", "snippet") + query.Add("id", videoID) + query.Add("key", a.apiKey) + req.URL.RawQuery = query.Encode() + + req.Header.Add("Accept", "application/json") + + resp, err := a.client.Do(req) + defer resp.Body.Close() + if err != nil { + log.Errorf("Error from YouTube API: %v", err) + return nil, err + } + + body, err := io.ReadAll(resp.Body) + log.Tracef("Response from YouTube API: %s", string(body[:])) + + var result videoListResponse + err = json.Unmarshal(body, &result) + if err != nil { + log.Errorf("Error deserializing video list response from YouTube API: %v", err) + return nil, err + } + + if len(result.Items) != 1 { + err = fmt.Errorf("YouTube API responded with incorrect number of snippets (%d) while attempting to get snippet data for video %s", len(result.Items), videoID) + return nil, err + } + + return &result.Items[0].Snippet, nil +} + +type videoListResponse struct { + Items []struct { + Snippet VideoSnippet `json:"snippet"` + } `json:"items"` +} + +type VideoSnippet struct { + PublishedAt string `json:"publishedAt"` +} diff --git a/local/ytdl.go b/local/ytdl.go new file mode 100644 index 0000000..7baa893 --- /dev/null +++ b/local/ytdl.go @@ -0,0 +1,239 @@ +package local + +import ( + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "os" + "os/exec" + "path" + "strings" + + log "github.com/sirupsen/logrus" + + "github.com/lbryio/ytsync/v5/downloader/ytdl" +) + +type Ytdl struct { + DownloadDir string +} + +func NewYtdl(downloadDir string) (*Ytdl, error) { + // TODO validate download dir + + y := Ytdl { + DownloadDir: downloadDir, + } + + return &y, nil +} + +func (y *Ytdl) GetVideoMetadata(videoID string) (*ytdl.YtdlVideo, error) { + metadataPath, err := y.GetVideoMetadataFile(videoID) + if err != nil { + return nil, err + } + + metadataBytes, err := os.ReadFile(metadataPath) + if err != nil { + return nil, err + } + + var metadata *ytdl.YtdlVideo + err = json.Unmarshal(metadataBytes, &metadata) + if err != nil { + return nil, err + } + + return metadata, nil +} + + +func (y *Ytdl) GetVideoMetadataFile(videoID string) (string, error) { + basePath := path.Join(y.DownloadDir, videoID) + metadataPath := basePath + ".info.json" + + _, err := os.Stat(metadataPath) + if err != nil && !os.IsNotExist(err) { + log.Errorf("Error determining if video metadata already exists: %v", err) + return "", err + } else if err != nil { + log.Debugf("Metadata file for video %s does not exist. Downloading now.", videoID) + err = downloadVideoMetadata(basePath, videoID) + if err != nil { + return "", err + } + } + + return metadataPath, nil +} + +func (y *Ytdl) GetVideoFile(videoID string) (string, error) { + videoPath, err := findDownloadedVideo(y.DownloadDir, videoID) + if err != nil { + return "", err + } + + if videoPath != nil { + return *videoPath, nil + } + + basePath := path.Join(y.DownloadDir, videoID) + metadataPath, err := y.GetVideoMetadataFile(videoID) + if err != nil { + log.Errorf("Error getting metadata path in preparation for video download: %v", err) + return "", err + } + err = downloadVideo(basePath, metadataPath) + if err != nil { + return "", nil + } + + videoPath, err = findDownloadedVideo(y.DownloadDir, videoID) + if err != nil { + log.Errorf("Error from findDownloadedVideo() after already succeeding once: %v", err) + return "", err + } + if videoPath == nil { + return "", errors.New("Could not find a downloaded video after successful download.") + } + + return *videoPath, nil +} + +func (y *Ytdl) DeleteVideoFiles(videoID string) error { + files, err := ioutil.ReadDir(y.DownloadDir) + if err != nil { + return err + } + + for _, f := range files { + if f.IsDir() { + continue + } + if strings.Contains(f.Name(), videoID) { + videoPath := path.Join(y.DownloadDir, f.Name()) + err = os.Remove(videoPath) + if err != nil { + log.Errorf("Error while deleting file %s: %v", y.DownloadDir, err) + return err + } + } + } + + return nil +} + +func deleteFile(path string) error { + _, err := os.Stat(path) + if err != nil && !os.IsNotExist(err) { + log.Errorf("Error determining if file %s exists: %v", path, err) + return err + } else if err != nil { + log.Debugf("File %s does not exist. Skipping deletion.", path) + return nil + } + + return os.Remove(path) +} + +func findDownloadedVideo(videoDir, videoID string) (*string, error) { + files, err := ioutil.ReadDir(videoDir) + if err != nil { + return nil, err + } + + for _, f := range files { + if f.IsDir() { + continue + } + if path.Ext(f.Name()) == ".mp4" && strings.Contains(f.Name(), videoID) { + videoPath := path.Join(videoDir, f.Name()) + return &videoPath, nil + } + } + return nil, nil +} + +func downloadVideoMetadata(basePath, videoID string) error { + ytdlArgs := []string{ + "--skip-download", + "--write-info-json", + "--force-overwrites", + fmt.Sprintf("https://www.youtube.com/watch?v=%s", videoID), + "--cookies", + "cookies.txt", + "-o", + basePath, + } + ytdlCmd := exec.Command("yt-dlp", ytdlArgs...) + output, err := runCmd(ytdlCmd) + log.Debug(output) + return err +} + +func downloadVideo(basePath, metadataPath string) error { + ytdlArgs := []string{ + "--no-progress", + "-o", + basePath, + "--merge-output-format", + "mp4", + "--postprocessor-args", + "ffmpeg:-movflags faststart", + "--abort-on-unavailable-fragment", + "--fragment-retries", + "1", + "--cookies", + "cookies.txt", + "--extractor-args", + "youtube:player_client=android", + "--load-info-json", + metadataPath, + "-fbestvideo[ext=mp4][vcodec!*=av01][height<=720]+bestaudio[ext!=webm][format_id!=258][format_id!=251][format_id!=256][format_id!=327]", + } + + ytdlCmd := exec.Command("yt-dlp", ytdlArgs...) + output, err := runCmd(ytdlCmd) + log.Debug(output) + return err +} + +func runCmd(cmd *exec.Cmd) ([]string, error) { + log.Infof("running cmd: %s", strings.Join(cmd.Args, " ")) + var err error + stderr, err := cmd.StderrPipe() + if err != nil { + return nil, err + } + stdout, err := cmd.StdoutPipe() + if err != nil { + return nil, err + } + err = cmd.Start() + if err != nil { + return nil, err + } + outLog, err := ioutil.ReadAll(stdout) + if err != nil { + return nil, err + } + errorLog, err := ioutil.ReadAll(stderr) + if err != nil { + return nil, err + } + done := make(chan error, 1) + go func() { + done <- cmd.Wait() + }() + + select { + case err := <-done: + if err != nil { + log.Error(string(errorLog)) + return nil, err + } + return strings.Split(strings.Replace(string(outLog), "\r\n", "\n", -1), "\n"), nil + } +} diff --git a/local/ytdlVideoSource.go b/local/ytdlVideoSource.go new file mode 100644 index 0000000..b68c18f --- /dev/null +++ b/local/ytdlVideoSource.go @@ -0,0 +1,76 @@ +package local + +import ( + log "github.com/sirupsen/logrus" + + "github.com/lbryio/ytsync/v5/downloader/ytdl" +) + +type YtdlVideoSource struct { + downloader Ytdl + enrichers []YouTubeVideoEnricher +} + +func NewYtdlVideoSource(downloadDir string, config *YouTubeSourceConfig) (*YtdlVideoSource, error) { + ytdl, err := NewYtdl(downloadDir) + if err != nil { + return nil, err + } + + source := YtdlVideoSource { + downloader: *ytdl, + } + + if config.YouTubeAPIKey != "" { + ytapiEnricher := NewYouTubeAPIVideoEnricher(config.YouTubeAPIKey) + source.enrichers = append(source.enrichers, ytapiEnricher) + } + + return &source, nil +} + +func (s *YtdlVideoSource) GetVideo(id string) (*SourceVideo, error) { + metadata, err := s.downloader.GetVideoMetadata(id) + if err != nil { + return nil, err + } + + videoPath, err := s.downloader.GetVideoFile(id) + if err != nil { + return nil, err + } + + var bestThumbnail *ytdl.Thumbnail = nil + for i, thumbnail := range metadata.Thumbnails { + if i == 0 || bestThumbnail.Width < thumbnail.Width { + bestThumbnail = &thumbnail + } + } + + sourceVideo := SourceVideo { + ID: id, + Title: &metadata.Title, + Description: &metadata.Description, + SourceURL: "\nhttps://www.youtube.com/watch?v=" + id, + Languages: []string{}, + Tags: metadata.Tags, + ReleaseTime: nil, + ThumbnailURL: &bestThumbnail.URL, + FullLocalPath: videoPath, + } + + for _, enricher := range s.enrichers { + err = enricher.EnrichMissing(&sourceVideo) + if err != nil { + log.Warnf("Error enriching video %s, continuing enrichment: %v", id, err) + } + } + + log.Debugf("Source video retrieved via ytdl: %v", sourceVideo) + + return &sourceVideo, nil +} + +func (s *YtdlVideoSource) DeleteLocalCache(id string) error { + return s.downloader.DeleteVideoFiles(id) +}