diff --git a/go.mod b/go.mod index 40b2c65..0c2cdac 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/kr/pretty v0.2.1 // indirect github.com/lbryio/lbry.go/v2 v2.7.2-0.20210824154606-3e18b74da08b github.com/lbryio/reflector.go v1.1.3-0.20210412225256-4392c9724262 + github.com/mattn/go-sqlite3 v1.10.0 github.com/miekg/dns v1.1.22 // indirect github.com/mitchellh/go-ps v0.0.0-20190716172923-621e5597135b github.com/opencontainers/go-digest v1.0.0-rc1 // indirect diff --git a/go.sum b/go.sum index 3b1c9d2..1d40796 100644 --- a/go.sum +++ b/go.sum @@ -318,6 +318,7 @@ github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaO github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU= github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/mattn/go-sqlite3 v1.10.0 h1:jbhqpg7tQe4SupckyijYiy0mJJ/pRyHvXf7JdWK860o= github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= diff --git a/local/local.go b/local/local.go index cacc57a..2665743 100644 --- a/local/local.go +++ b/local/local.go @@ -1,22 +1,460 @@ package local import ( - "fmt" + "errors" + "os" + "regexp" + "strings" + "time" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "github.com/abadojack/whatlanggo" + + "github.com/lbryio/lbry.go/v2/extras/util" + "github.com/lbryio/ytsync/v5/namer" + "github.com/lbryio/ytsync/v5/tags_manager" ) +type SyncContext struct { + DryRun bool + KeepCache bool + ReflectStreams bool + ForceChannelScan bool + TempDir string + SyncDbPath string + LbrynetAddr string + ChannelID string + VideoID string + PublishBid float64 + YouTubeSourceConfig *YouTubeSourceConfig +} + +func (c *SyncContext) Validate() error { + if c.TempDir == "" { + return errors.New("No TempDir provided") + } + if c.SyncDbPath == "" { + return errors.New("No sync DB path provided") + } + if c.LbrynetAddr == "" { + return errors.New("No Lbrynet address provided") + } + if c.ChannelID == "" { + return errors.New("No channel ID provided") + } + if c.PublishBid <= 0.0 { + return errors.New("Publish bid is not greater than zero") + } + + if c.YouTubeSourceConfig.ChannelID != "" { + // Validate for YouTube source + // For now, an API key is required + if c.YouTubeSourceConfig.APIKey == "" { + return errors.New("YouTube source was selected, but no YouTube API key was provided.") + } + } else { + return errors.New("No video source provided") + } + return nil +} + +type YouTubeSourceConfig struct { + ChannelID string + APIKey string +} + +var syncContext SyncContext + func AddCommand(rootCmd *cobra.Command) { cmd := &cobra.Command{ Use: "local", Short: "run a personal ytsync", Run: localCmd, + Args: cobra.ExactArgs(0), } - //cmd.Flags().StringVar(&cache, "cache", "", "path to cache") - rootCmd.AddCommand(cmd) + cmd.Flags().BoolVar(&syncContext.DryRun, "dry-run", false, "Display information about the stream publishing, but do not publish the stream") + cmd.Flags().BoolVar(&syncContext.KeepCache, "keep-cache", false, "Don't delete local files after publishing.") + cmd.Flags().BoolVar(&syncContext.ReflectStreams, "reflect-streams", true, "Require published streams to be reflected.") + cmd.Flags().BoolVar(&syncContext.ForceChannelScan, "force-rescan", false, "Rescan channel to fill the sync DB.") + cmd.Flags().StringVar(&syncContext.TempDir, "temp-dir", getEnvDefault("TEMP_DIR", ""), "directory to use for temporary files") + cmd.Flags().StringVar(&syncContext.SyncDbPath, "sync-db-path", getEnvDefault("SYNC_DB_PATH", ""), "Path to the local sync DB") + cmd.Flags().Float64Var(&syncContext.PublishBid, "publish-bid", 0.01, "Bid amount for the stream claim") + cmd.Flags().StringVar(&syncContext.LbrynetAddr, "lbrynet-address", getEnvDefault("LBRYNET_ADDRESS", ""), "JSONRPC address of the local LBRYNet daemon") + cmd.Flags().StringVar(&syncContext.ChannelID, "channel-id", "", "LBRY channel ID to publish to") + cmd.Flags().StringVar(&syncContext.VideoID, "video-id", "", "ID of video to sync. This will attempt to sync only this one video.") + syncContext.YouTubeSourceConfig = &YouTubeSourceConfig{} + cmd.Flags().StringVar(&syncContext.YouTubeSourceConfig.APIKey, "youtube-api-key", getEnvDefault("YOUTUBE_API_KEY", ""), "YouTube API Key") + cmd.Flags().StringVar(&syncContext.YouTubeSourceConfig.ChannelID, "youtube-channel", "", "YouTube Channel ID") + rootCmd.AddCommand(cmd) +} + +func getEnvDefault(key, defaultValue string) string { + if value, ok := os.LookupEnv(key); ok { + return value + } + return defaultValue } func localCmd(cmd *cobra.Command, args []string) { - fmt.Println("local") + err := syncContext.Validate() + if err != nil { + log.Error(err) + return + } + + syncDB, err := NewSyncDb(syncContext.SyncDbPath) + if err != nil { + log.Errorf("Error creating sync DB: %v", err) + return + } + defer syncDB.Close() + + var publisher VideoPublisher + publisher, err = NewLocalSDKPublisher(syncContext.LbrynetAddr, syncContext.ChannelID, syncContext.PublishBid) + if err != nil { + log.Errorf("Error setting up publisher: %v", err) + return + } + + var videoSource VideoSource + if syncContext.YouTubeSourceConfig != nil { + videoSource, err = NewYtdlVideoSource(syncContext.TempDir, syncContext.YouTubeSourceConfig, syncDB) + if err != nil { + log.Errorf("Error setting up video source: %v", err) + return + } + } + + latestPublishedReleaseTime := int64(0) + latestKnownReleaseTime := int64(0) + if syncContext.ForceChannelScan { + log.Infof("Channel scan is being forced.") + } else { + dbSummary, err := syncDB.GetSummary() + if err != nil { + log.Errorf("Error getting sync DB summary for update scan: %v", err) + return + } + latestPublishedReleaseTime = dbSummary.LatestPublished + latestKnownReleaseTime = dbSummary.LatestKnown + } + log.Debugf("Latest known release time: %d", latestKnownReleaseTime) + for result := range videoSource.Scan(latestKnownReleaseTime) { + if result.Error != nil { + log.Errorf("Error while discovering new videos from source: %v", result.Error) + } else { + syncDB.SaveKnownVideo(*result.Video) + } + } + log.Debugf("Latest published release time: %d", latestPublishedReleaseTime) + for result := range publisher.PublishedVideoIterator(latestPublishedReleaseTime) { + if result.Error != nil { + log.Errorf("Error while discovering published videos: %v", result.Error) + } else { + syncDB.SavePublishedVideo(*result.Video) + } + } + + var videoIDs []string + if syncContext.VideoID == "" { + videoIDs, err = syncDB.GetUnpublishedIDs(videoSource.SourceName()) + if err != nil { + log.Errorf("Error getting unpublished videos from sync DB: %v", err) + return + } + } else { + videoIDs = []string{ syncContext.VideoID } + } + log.Debugf("Syncing videos: %v", videoIDs) + for _, videoID := range videoIDs { + err = syncVideo(syncContext, syncDB, videoSource, publisher, videoID) + if err != nil { + log.Errorf("Error syncing %s: %v", videoID, err) + return + } + } + log.Info("Done") +} + +func cacheVideo(syncContext SyncContext, syncDB *SyncDb, videoSource VideoSource, videoID string) (*PublishableVideo, error) { + log.Debugf("Ensuring video %s:%s is cached", videoSource.SourceName(), videoID) + + videoRecord, err := syncDB.GetVideoRecord(videoSource.SourceName(), videoID, true, true) + if err != nil { + log.Errorf("Error checking if video is already cached: %v", err) + return nil, err + } + + if videoRecord != nil && videoRecord.FullLocalPath.Valid { + log.Debugf("%s:%s is already cached.", videoSource.SourceName(), videoID) + video := videoRecord.ToPublishableVideo() + if video == nil { + log.Warnf("%s:%s appears to be cached locally, but has missing data. Caching again.") + } + return video, nil + } + + log.Debugf("%s:%s is not cached locally. Caching now.", videoSource.SourceName(), videoID) + sourceVideo, err := videoSource.GetVideo(videoID) + if err != nil { + log.Errorf("Error getting source video: %v", err) + return nil, err + } + + processedVideo, err := processVideoForPublishing(*sourceVideo, syncContext.ChannelID) + if err != nil { + log.Errorf("Error processing source video for publishing: %v", err) + return nil, err + } + + err = syncDB.SavePublishableVideo(*processedVideo) + if err != nil { + log.Errorf("Error saving video data: %v", err) + return nil, err + } + + return processedVideo, nil +} + +func syncVideo(syncContext SyncContext, syncDB *SyncDb, videoSource VideoSource, publisher VideoPublisher, videoID string) error { + log.Debugf("Running sync for video %s:%s", videoSource.SourceName(), videoID) + + isSynced, claimID, err := syncDB.IsVideoPublished(videoSource.SourceName(), videoID) + if err != nil { + log.Errorf("Error checking if video is already synced: %v", err) + return err + } + + if isSynced { + log.Infof("Video %s:%s is already published as %s.", videoSource.SourceName(), videoID, claimID) + return nil + } + + processedVideo, err := cacheVideo(syncContext, syncDB, videoSource, videoID) + if err != nil { + log.Errorf("Error ensuring video is cached prior to publication: %v", err) + return err + } + + if syncContext.DryRun { + log.Infoln("This is a dry run. Nothing will be published.") + log.Infof("The local file %s would be published to channel ID %s as %s.", processedVideo.FullLocalPath, syncContext.ChannelID, processedVideo.ClaimName) + log.Debugf("Object to be published: %v", processedVideo) + + } else { + claimID, doneReflectingCh, err := publisher.Publish(*processedVideo, syncContext.ReflectStreams) + if err != nil { + log.Errorf("Error publishing video: %v", err) + return err + } + err = syncDB.SavePublishedVideo((*processedVideo).ToPublished(claimID)) + if err != nil { + // Sync DB is corrupted after getting here + // and will allow double publication. + log.Errorf("Error saving video publication to sync DB: %v", err) + return err + } + + if syncContext.ReflectStreams { + err = <-doneReflectingCh + if err != nil { + log.Errorf("Error while wating for stream to reflect: %v", err) + return err + } + } else { + log.Debugln("Not waiting for stream to reflect.") + } + } + + if !syncContext.KeepCache { + log.Infof("Deleting local files.") + err = syncDB.MarkVideoUncached(videoSource.SourceName(), videoID) + if err != nil { + log.Errorf("Error marking video %s:%s as uncached in syncDB", videoSource.SourceName(), videoID) + return err + } + err = videoSource.DeleteLocalCache(videoID) + if err != nil { + log.Errorf("Error deleting local files for video %s: %v", videoID, err) + return err + } + } + + return nil +} + +type SourceVideo struct { + ID string + Source string + Title *string + Description *string + SourceURL string + Languages []string + Tags []string + ReleaseTime *int64 + ThumbnailURL *string + FullLocalPath *string +} + +type PublishableVideo struct { + ID string + Source string + ClaimName string + Title string + Description string + SourceURL string + Languages []string + Tags []string + ReleaseTime int64 + ThumbnailURL string + FullLocalPath string +} + +func (v PublishableVideo) ToPublished(claimID string) PublishedVideo { + return PublishedVideo { + ClaimID: claimID, + NativeID: v.ID, + Source: v.Source, + ClaimName: v.ClaimName, + Title: v.Title, + Description: v.Description, + SourceURL: v.SourceURL, + Languages: v.Languages, + Tags: v.Tags, + ReleaseTime: v.ReleaseTime, + ThumbnailURL: v.ThumbnailURL, + FullLocalPath: v.FullLocalPath, + } +} + +type PublishedVideo struct { + ClaimID string + NativeID string + Source string + ClaimName string + Title string + Description string + SourceURL string + Languages []string + Tags []string + ReleaseTime int64 + ThumbnailURL string + FullLocalPath string +} + +func (v PublishedVideo) ToPublishable() PublishableVideo { + return PublishableVideo { + ID: v.NativeID, + Source: v.Source, + ClaimName: v.ClaimName, + Title: v.Title, + Description: v.Description, + SourceURL: v.SourceURL, + Languages: v.Languages, + Tags: v.Tags, + ReleaseTime: v.ReleaseTime, + ThumbnailURL: v.ThumbnailURL, + FullLocalPath: v.FullLocalPath, + } +} + +func processVideoForPublishing(source SourceVideo, channelID string) (*PublishableVideo, error) { + if source.FullLocalPath == nil { + return nil, errors.New("Video is not cached locally") + } + + tags, err := tags_manager.SanitizeTags(source.Tags, channelID) + if err != nil { + log.Errorf("Error sanitizing tags: %v", err) + return nil, err + } + + descriptionSample := "" + if source.Description != nil { + urlsRegex := regexp.MustCompile(`(?m) ?(f|ht)(tp)(s?)(://)(.*)[.|/](.*)`) + descriptionSample = urlsRegex.ReplaceAllString(*source.Description, "") + } + info := whatlanggo.Detect(descriptionSample) + + title := "" + if source.Title != nil { + title = *source.Title + } + info2 := whatlanggo.Detect(title) + var languages []string = nil + if info.IsReliable() && info.Lang.Iso6391() != "" { + language := info.Lang.Iso6391() + languages = []string{language} + } else if info2.IsReliable() && info2.Lang.Iso6391() != "" { + language := info2.Lang.Iso6391() + languages = []string{language} + } + + claimName := namer.NewNamer().GetNextName(title) + + thumbnailURL := source.ThumbnailURL + if thumbnailURL == nil { + thumbnailURL = util.PtrToString("") + } + + releaseTime := source.ReleaseTime + if releaseTime == nil { + releaseTime = util.PtrToInt64(time.Now().Unix()) + } + + processed := PublishableVideo { + ID: source.ID, + Source: source.Source, + ClaimName: claimName, + Title: title, + Description: getAbbrevDescription(source), + Languages: languages, + Tags: tags, + ReleaseTime: *releaseTime, + ThumbnailURL: *thumbnailURL, + FullLocalPath: *source.FullLocalPath, + } + + log.Debugf("Video prepared for publication: %v", processed) + + return &processed, nil +} + +func getAbbrevDescription(v SourceVideo) string { + if v.Description == nil { + return v.SourceURL + } + + additionalDescription := "\n...\n" + v.SourceURL + maxLength := 2800 - len(additionalDescription) + + description := strings.TrimSpace(*v.Description) + if len(description) > maxLength { + description = description[:maxLength] + } + return description + additionalDescription +} + +type VideoSource interface { + SourceName() string + GetVideo(id string) (*SourceVideo, error) + DeleteLocalCache(id string) error + Scan(sinceTimestamp int64) <-chan SourceScanIteratorResult +} + +type SourceScanIteratorResult struct { + Video *SourceVideo + Error error +} + +type VideoPublisher interface { + Publish(video PublishableVideo, reflectStream bool) (string, <-chan error, error) + PublishedVideoIterator(sinceTimestamp int64) <-chan PublishedVideoIteratorResult +} + +type PublishedVideoIteratorResult struct { + Video *PublishedVideo + Error error } diff --git a/local/localSDKPublisher.go b/local/localSDKPublisher.go new file mode 100644 index 0000000..24c0124 --- /dev/null +++ b/local/localSDKPublisher.go @@ -0,0 +1,190 @@ +package local + +import ( + "errors" + "sort" + "time" + + log "github.com/sirupsen/logrus" + + "github.com/lbryio/lbry.go/v2/extras/jsonrpc" + "github.com/lbryio/lbry.go/v2/extras/util" +) + +type LocalSDKPublisher struct { + channelID string + publishBid float64 + lbrynet *jsonrpc.Client +} + +func NewLocalSDKPublisher(sdkAddr, channelID string, publishBid float64) (*LocalSDKPublisher, error) { + lbrynet := jsonrpc.NewClient(sdkAddr) + lbrynet.SetRPCTimeout(5 * time.Minute) + + status, err := lbrynet.Status() + if err != nil { + return nil, err + } + + if !status.IsRunning { + return nil, errors.New("SDK is not running") + } + + // Should check to see if the SDK owns the channel + + // Should check to see if wallet is unlocked + // but jsonrpc.Client doesn't have WalletStatus method + // so skip for now + + // Should check to see if streams are configured to be reflected and warn if not + // but jsonrpc.Client doesn't have SettingsGet method to see if streams are reflected + // so use File.UploadingToReflector as a proxy for now + + publisher := LocalSDKPublisher { + channelID: channelID, + publishBid: publishBid, + lbrynet: lbrynet, + } + return &publisher, nil +} + +func (p *LocalSDKPublisher) Publish(video PublishableVideo, reflectStream bool) (string, <-chan error, error) { + streamCreateOptions := jsonrpc.StreamCreateOptions { + ClaimCreateOptions: jsonrpc.ClaimCreateOptions { + Title: &video.Title, + Description: &video.Description, + Languages: video.Languages, + ThumbnailURL: &video.ThumbnailURL, + Tags: video.Tags, + }, + ReleaseTime: &video.ReleaseTime, + ChannelID: &p.channelID, + License: util.PtrToString("Copyrighted (contact publisher)"), + } + + txSummary, err := p.lbrynet.StreamCreate(video.ClaimName, video.FullLocalPath, p.publishBid, streamCreateOptions) + if err != nil { + return "", nil, err + } + + var claimID *string + for _, output := range txSummary.Outputs { + if output.Type == "claim" { + claimID = &output.ClaimID + break + } + } + if claimID == nil { + return "", nil, errors.New("Publish transaction did not have a claim output.") + } + + if !reflectStream { + return "", nil, nil + } + + done := make(chan error, 1) + go func() { + for { + fileListResponse, fileIndex, err := findFileByTxid(p.lbrynet, txSummary.Txid) + if err != nil { + log.Errorf("Error finding file by txid: %v", err) + done <- err + return + } + if fileListResponse == nil { + log.Errorf("Could not find file in list with correct txid") + done <- err + return + } + + fileStatus := fileListResponse.Items[fileIndex] + if fileStatus.IsFullyReflected { + log.Info("Stream is fully reflected") + break + } + if !fileStatus.UploadingToReflector { + log.Error("Stream is not being uploaded to a reflector. Check your lbrynet settings if this is a mistake.") + done <- errors.New("Stream is not being reflected (check lbrynet settings).") + return + } + log.Infof("Stream reflector progress: %d%%", fileStatus.ReflectorProgress) + time.Sleep(5 * time.Second) + } + done <- nil + }() + + return *claimID, done, nil +} + +func (p *LocalSDKPublisher) PublishedVideoIterator(sinceTimestamp int64) <-chan PublishedVideoIteratorResult { + videoCh := make(chan PublishedVideoIteratorResult, 10) + go func() { + defer close(videoCh) + for page := uint64(0); ; page++ { + streams, err := p.lbrynet.StreamList(nil, page, 100) + if err != nil { + log.Errorf("Error listing streams (page %d): %v", page, err) + + errResult := PublishedVideoIteratorResult { + Error: err, + } + videoCh <- errResult + return + } + if len(streams.Items) == 0 { + return + } + + for _, stream := range streams.Items { + if stream.ChannelID != p.channelID || stream.Value.GetStream().ReleaseTime < sinceTimestamp { + continue + } + + languages := []string{} + for _, language := range stream.Value.Languages { + languages = append(languages, language.String()) + } + + video := PublishedVideo { + ClaimID: stream.ClaimID, + NativeID: "", + Source: "", + ClaimName: stream.Name, + Title: stream.Value.Title, + Description: stream.Value.Description, + Languages: languages, + Tags: stream.Value.Tags, + ReleaseTime: stream.Value.GetStream().ReleaseTime, + ThumbnailURL: stream.Value.Thumbnail.Url, + FullLocalPath: "", + } + + videoResult := PublishedVideoIteratorResult { + Video: &video, + } + videoCh <- videoResult + } + } + }() + + return videoCh +} + +// if jsonrpc.Client.FileList is extended to match the actual jsonrpc schema, this can be removed +func findFileByTxid(client *jsonrpc.Client, txid string) (*jsonrpc.FileListResponse, int, error) { + response, err := client.FileList(0, 20) + for { + if err != nil { + log.Errorf("Error getting file list page: %v", err) + return nil, 0, err + } + index := sort.Search(len(response.Items), func (i int) bool { return response.Items[i].Txid == txid }) + if index < len(response.Items) { + return response, index, nil + } + if response.Page >= response.TotalPages { + return nil, 0, nil + } + response, err = client.FileList(response.Page + 1, 20) + } +} diff --git a/local/readme.md b/local/readme.md index 16efe8b..81b0235 100644 --- a/local/readme.md +++ b/local/readme.md @@ -5,6 +5,7 @@ - LBRY SDK (what do we actually need this for?) - youtube-dl - enough space to cache stuff +- YouTube data API key ## Process @@ -15,6 +16,21 @@ - or easier, just error if no channel - enough lbc in wallet? +### Getting a YouTube API key + +To access the YouTube data API, you will first need some kind of google account. + +The API has two methods of authentication, OAuth2 and API keys. This application uses API keys. +These API keys are basically like passwords, and so once obtained, they should not be shared. + +The instructions for obtaining an API key are copied below from [here](https://developers.google.com/youtube/registering_an_application): + + +1. Open the [Credentials page](https://console.developers.google.com/apis/credentials) in the API Console. +2. Create an API key in the Console by clicking **Create credentials > API key**. You can restrict the key before using it in production by clicking **Restrict key** and selecting one of the **Restrictions**. + +To keep your API keys secure, follow the [best practices for securely using API keys](https://cloud.google.com/docs/authentication/api-keys). + ### Options to figure out what's already synced - simplest: assume nothing is synced yet @@ -50,4 +66,4 @@ ### Debugging -- dry-running the whole thing \ No newline at end of file +- dry-running the whole thing diff --git a/local/syncDb.go b/local/syncDb.go new file mode 100644 index 0000000..c3a4fb9 --- /dev/null +++ b/local/syncDb.go @@ -0,0 +1,676 @@ +package local + +import ( + "database/sql" + _ "github.com/mattn/go-sqlite3" + log "github.com/sirupsen/logrus" +) + +type SyncDb struct { + db *sql.DB +} + +type SyncDbSummary struct { + Total int + CachedUnpublished int + UncachedUnpublished int + LatestKnown int64 + LatestPublished int64 +} + +func NewSyncDb(path string) (*SyncDb, error) { + db, err := sql.Open("sqlite3", path) + if err != nil { + log.Errorf("Error opening cache DB at %s: %v", path, err) + return nil, err + } + + cache := SyncDb { + db: db, + } + err = cache.ensureSchema() + if err != nil { + log.Errorf("Error while ensuring sync DB structure: %v", err) + return nil, err + } + + return &cache, nil +} + +func (c *SyncDb) Close() error { + return c.db.Close() +} + +func (c *SyncDb) SaveKnownVideo(video SourceVideo) error { + if video.ID == "" { + log.Warnf("Trying to save a video with no ID: %v", video) + } + insertSql := ` +INSERT INTO videos ( + source, + native_id, + title, + description, + source_url, + release_time, + thumbnail_url, + full_local_path +) VALUES (?, ?, ?, ?, ?, ?, ?, ?) +ON CONFLICT (source, native_id) +DO NOTHING; + ` + + r := SyncRecordFromSourceVideo(video) + + _, err := c.db.Exec( + insertSql, + r.Source, + r.NativeID, + r.Title, + r.Description, + r.SourceURL, + r.ReleaseTime, + r.ThumbnailURL, + r.FullLocalPath, + ) + return err +} + +func (c *SyncDb) SavePublishableVideo(video PublishableVideo) error { + upsertSql := ` +INSERT INTO videos ( + source, + native_id, + claim_name, + title, + description, + source_url, + release_time, + thumbnail_url, + full_local_path +) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) +ON CONFLICT (source, native_id) +DO UPDATE SET + claim_name = excluded.claim_name, + title = excluded.title, + description = excluded.description, + source_url = excluded.source_url, + release_time = excluded.release_time, + thumbnail_url = excluded.thumbnail_url, + full_local_path = excluded.full_local_path; + ` + _, err := c.db.Exec( + upsertSql, + video.Source, + video.ID, + video.ClaimName, + video.Title, + video.Description, + video.SourceURL, + video.ReleaseTime, + video.ThumbnailURL, + video.FullLocalPath, + ) + if err != nil { + return err + } + + + err = c.upsertTags(video.Source, video.ID, video.Tags) + if err != nil { + return err + } + + err = c.upsertLanguages(video.Source, video.ID, video.Languages) + return err +} + +func (c *SyncDb) SavePublishedVideo(video PublishedVideo) error { + upsertSql := ` +INSERT INTO videos ( + source, + native_id, + claim_id, + claim_name, + title, + description, + source_url, + release_time, + thumbnail_url, + full_local_path +) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +ON CONFLICT (source, native_id) +DO UPDATE SET + claim_id = excluded.claim_id, + claim_name = excluded.claim_name, + title = excluded.title, + description = excluded.description, + source_url = excluded.source_url, + release_time = excluded.release_time, + thumbnail_url = excluded.thumbnail_url, + full_local_path = excluded.full_local_path; + ` + _, err := c.db.Exec( + upsertSql, + video.Source, + video.NativeID, + video.ClaimID, + video.ClaimName, + video.Title, + video.Description, + video.SourceURL, + video.ReleaseTime, + video.ThumbnailURL, + video.FullLocalPath, + ) + if err != nil { + return err + } + + + err = c.upsertTags(video.Source, video.NativeID, video.Tags) + if err != nil { + return err + } + + err = c.upsertLanguages(video.Source, video.NativeID, video.Languages) + return err +} + +func (c *SyncDb) MarkVideoUncached(source, id string) error { + updateSql := ` +UPDATE videos +SET full_local_path = NULL +WHERE source = ? AND native_id = ?; + ` + _, err := c.db.Exec( + updateSql, + source, + id, + ) + return err +} + +func (c *SyncDb) _SaveVideoData(video SourceVideo) error { + upsertSql := ` +INSERT INTO videos ( + source, + native_id, + title, + description, + source_url, + release_time, + thumbnail_url, + full_local_path +) VALUES (?, ?, ?, ?, ?, ?, ?, ?) +ON CONFLICT (source, native_id) +DO UPDATE SET + title = excluded.title, + description = excluded.description, + source_url = excluded.source_url, + release_time = excluded.release_time, + thumbnail_url = excluded.thumbnail_url, + full_local_path = excluded.full_local_path; + ` + _, err := c.db.Exec( + upsertSql, + video.Source, + video.ID, + video.Title, + video.Description, + video.SourceURL, + video.ReleaseTime, + video.ThumbnailURL, + video.FullLocalPath, + ) + if err != nil { + return err + } + + err = c.upsertTags(video.Source, video.ID, video.Tags) + if err != nil { + return err + } + + err = c.upsertLanguages(video.Source, video.ID, video.Languages) + return err +} + +func (c *SyncDb) SaveVideoPublication(video PublishableVideo, claimID string) error { + upsertSql := ` +INSERT INTO videos ( + source, + native_id, + title, + description, + source_url, + release_time, + thumbnail_url, + full_local_path, + claim_id +) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) +ON CONFLICT (source, native_id) +DO UPDATE SET + title = excluded.title, + description = excluded.description, + source_url = excluded.source_url, + release_time = excluded.release_time, + thumbnail_url = excluded.thumbnail_url, + full_local_path = excluded.full_local_path, + claim_id = excluded.claim_id; + ` + _, err := c.db.Exec( + upsertSql, + video.Source, + video.ID, + video.Title, + video.Description, + video.SourceURL, + video.ReleaseTime, + video.ThumbnailURL, + video.FullLocalPath, + claimID, + ) + if err != nil { + return err + } + + err = c.upsertTags(video.Source, video.ID, video.Tags) + if err != nil { + return err + } + + err = c.upsertLanguages(video.Source, video.ID, video.Languages) + return err +} + +func (c *SyncDb) IsVideoPublished(source, id string) (bool, string, error) { + selectSql := ` +SELECT + claim_id +FROM videos +WHERE source = ? AND native_id = ? + ` + row := c.db.QueryRow(selectSql, source, id) + + var claimID sql.NullString + err := row.Scan(&claimID) + + if err == sql.ErrNoRows { + return false, "", nil + } else if err != nil { + log.Errorf("Error querying video publication for %s:%s from sync DB: %v", source, id, err) + return false, "", err + } + + if claimID.Valid { + return true, claimID.String, nil + } else { + return false, "", nil + } +} + +func (c *SyncDb) IsVideoCached(source, id string) (bool, string, error) { + selectSql := ` +SELECT + full_local_path +FROM videos +WHERE source = ? AND native_id = ? + ` + row := c.db.QueryRow(selectSql, source, id) + + var localPath sql.NullString + err := row.Scan(&localPath) + + if err == sql.ErrNoRows { + return false, "", nil + } else if err != nil { + log.Errorf("Error querying video cache status for %s:%s from sync DB: %v", source, id, err) + return false, "", err + } + + if localPath.Valid { + return true, localPath.String, nil + } else { + return false, "", nil + } +} + +func (c *SyncDb) GetVideoRecord(source, id string, includeTags, includeLanguages bool) (*SyncRecord, error) { + selectSql := ` +SELECT + native_id, + title, + description, + source_url, + release_time, + thumbnail_url, + full_local_path, + claim_id +FROM videos +WHERE source = ? AND native_id = ? + ` + row := c.db.QueryRow(selectSql, source, id) + + var record SyncRecord + err := row.Scan( + &record.NativeID, + &record.Title, + &record.Description, + &record.SourceURL, + &record.ReleaseTime, + &record.ThumbnailURL, + &record.FullLocalPath, + &record.ClaimID, + ) + if err == sql.ErrNoRows { + log.Debugf("Data for %s:%s is not in the sync DB", source, id) + return nil, nil + } else if err != nil { + log.Errorf("Error querying video data for %s:%s from sync DB: %v", source, id, err) + return nil, err + } + + if includeTags { + tags, err := c.getTags(source, id) + if err != nil { + return nil, err + } + record.Tags = &tags + } + + if includeLanguages { + languages, err := c.getLanguages(source, id) + if err != nil { + return nil, err + } + record.Languages = &languages + } + + return &record, nil +} + +func (c *SyncDb) GetUnpublishedIDs(source string) ([]string, error) { + selectSql := ` +SELECT + native_id +FROM videos +WHERE source = ? AND claim_id IS NULL + ` + ids := []string{} + + rows, err := c.db.Query(selectSql, source) + if err != nil { + return ids, err + } + defer rows.Close() + + for rows.Next() { + var id string + err = rows.Scan(&id) + if err != nil { + return ids, err + } + ids = append(ids, id) + } + + return ids, nil +} + +func (c *SyncDb) GetSummary() (*SyncDbSummary, error) { + selectSql := ` +SELECT + COUNT() AS total, + COUNT(v_unpub.full_local_path) AS cached_unpublished, + COUNT(v_all.claim_id) - COUNT(v_unpub.full_local_path) AS uncached_unpublished, + MAX(v_all.release_time) AS latest_known, + MAX(v_pub.release_time) AS latest_published +FROM videos v_all +LEFT JOIN videos v_pub ON v_all.source = v_pub.source AND v_all.native_id = v_pub.native_id AND v_pub.claim_id IS NOT NULL +LEFT JOIN videos v_unpub ON v_all.source = v_unpub.source AND v_all.native_id = v_unpub.native_id AND v_unpub.claim_id IS NULL + ` + row := c.db.QueryRow(selectSql) + + var summary SyncDbSummary + var latestKnown, latestPublished sql.NullInt64 + err := row.Scan( + &summary.Total, + &summary.CachedUnpublished, + &summary.UncachedUnpublished, + &latestKnown, + &latestPublished, + ) + if err != nil { + log.Errorf("Error querying sync DB summary: %v", err) + return nil, err + } + + if latestKnown.Valid { + summary.LatestKnown = latestKnown.Int64 + } + if latestPublished.Valid { + summary.LatestPublished = latestPublished.Int64 + } + + return &summary, nil +} + +func (c *SyncDb) ensureSchema() error { + createSql := ` +CREATE TABLE IF NOT EXISTS videos ( + source TEXT, + native_id TEXT, + title TEXT, + description TEXT, + source_url TEXT, + release_time INT, + thumbnail_url TEXT, + full_local_path TEXT, + claim_name TEXT, + claim_id TEXT, + PRIMARY KEY (source, native_id) +); +CREATE TABLE IF NOT EXISTS video_tags ( + source TEXT NOT NULL, + native_id TEXT NOT NULL, + tag TEXT NOT NULL, + UNIQUE (source, native_id, tag) +); +CREATE TABLE IF NOT EXISTS video_languages ( + source TEXT NOT NULL, + native_id TEXT NOT NULL, + language TEXT NOT NULL, + UNIQUE (source, native_id, language) +); + ` + _, err := c.db.Exec(createSql) + return err +} + +func (c *SyncDb) upsertTags(source, id string, tags []string) error { + upsertSql := ` +INSERT INTO video_tags ( + source, + native_id, + tag +) VALUES (?, ?, ?) +ON CONFLICT (source, native_id, tag) +DO NOTHING; + ` + + for _, tag := range tags { + _, err := c.db.Exec( + upsertSql, + source, + id, + tag, + ) + if err != nil { + log.Errorf("Error inserting tag %s into sync DB for %s:%s: %v", tag, source, id, err) + return err + } + } + return nil +} + +func (c *SyncDb) getTags(source, id string) ([]string, error) { + selectSql := ` +SELECT tag +FROM video_tags +WHERE source = ? AND native_id = ?; + ` + + rows, err := c.db.Query(selectSql, source, id) + if err != nil { + log.Errorf("Error getting tags from sync DB for %s:%s: %v", source, id, err) + return nil, err + } + defer rows.Close() + + var tags []string + for rows.Next() { + var tag string + err = rows.Scan(&tag) + if err != nil { + log.Error("Error deserializing tag from sync DB for %s:%s: %v", source, id, err) + return nil, err + } + tags = append(tags, tag) + } + + return tags, nil +} + +func (c *SyncDb) upsertLanguages(source, id string, languages []string) error { + upsertSql := ` +INSERT INTO video_languages ( + source, + native_id, + language +) VALUES (?, ?, ?) +ON CONFLICT (source, native_id, language) +DO NOTHING; + ` + + for _, language := range languages { + _, err := c.db.Exec( + upsertSql, + source, + id, + language, + ) + if err != nil { + log.Errorf("Error inserting language %s into sync DB for %s:%s: %v", language, source, id, err) + return err + } + } + return nil +} + +func (c *SyncDb) getLanguages(source, id string) ([]string, error) { + selectSql := ` +SELECT language +FROM video_languages +WHERE source = ? AND native_id = ?; + ` + + rows, err := c.db.Query(selectSql, source, id) + if err != nil { + log.Errorf("Error getting languages from sync DB for %s:%s: %v", source, id, err) + return nil, err + } + defer rows.Close() + + var languages []string + for rows.Next() { + var language string + err = rows.Scan(&language) + if err != nil { + log.Error("Error deserializing language from sync DB for %s:%s: %v", source, id, err) + return nil, err + } + languages = append(languages, language) + } + + return languages, nil +} + +type SyncRecord struct { + Source string + NativeID string + Title sql.NullString + Description sql.NullString + SourceURL sql.NullString + ReleaseTime sql.NullInt64 + ThumbnailURL sql.NullString + FullLocalPath sql.NullString + ClaimID sql.NullString + Tags *[]string + Languages *[]string +} + +func SyncRecordFromSourceVideo(v SourceVideo) SyncRecord { + r := SyncRecord { + Source: v.Source, + NativeID: v.ID, + SourceURL: sql.NullString { String: v.SourceURL, Valid: true }, + } + + if v.Title != nil { + r.Title = sql.NullString { String: *v.Title, Valid: true } + } + + if v.Description != nil { + r.Description = sql.NullString { String: *v.Description, Valid: true } + } + + if v.ThumbnailURL != nil { + r.ThumbnailURL = sql.NullString { String: *v.ThumbnailURL, Valid: true } + } + + if v.FullLocalPath != nil { + r.FullLocalPath = sql.NullString { String: *v.FullLocalPath, Valid: true } + } + + if v.ReleaseTime != nil { + r.ReleaseTime = sql.NullInt64 { Int64: *v.ReleaseTime, Valid: true } + } + + if len(v.Tags) > 0 { + r.Tags = &v.Tags + } + + if len(v.Languages) > 0 { + r.Languages = &v.Languages + } + + return r +} + +func (r *SyncRecord) ToPublishableVideo() *PublishableVideo { + if !(r.Title.Valid && + r.Description.Valid && + r.SourceURL.Valid && + r.ReleaseTime.Valid && + r.ThumbnailURL.Valid && + r.FullLocalPath.Valid && + r.Tags != nil && + r.Languages != nil) { + + return nil + } + + video := PublishableVideo { + ID: r.NativeID, + Source: r.Source, + Description: r.Description.String, + SourceURL: r.SourceURL.String, + ReleaseTime: r.ReleaseTime.Int64, + ThumbnailURL: r.ThumbnailURL.String, + FullLocalPath: r.FullLocalPath.String, + Tags: *r.Tags, + Languages: *r.Languages, + } + + return &video +} diff --git a/local/youtubeChannelScanner.go b/local/youtubeChannelScanner.go new file mode 100644 index 0000000..7558c4a --- /dev/null +++ b/local/youtubeChannelScanner.go @@ -0,0 +1,51 @@ +package local + +type YouTubeChannelScanner interface { + Scan(sinceTimestamp int64) <-chan SourceScanIteratorResult +} + +type YouTubeAPIChannelScanner struct { + api *YouTubeAPI + channel string +} + +func NewYouTubeAPIChannelScanner(apiKey, channel string) (*YouTubeAPIChannelScanner) { + scanner := YouTubeAPIChannelScanner { + api: NewYouTubeAPI(apiKey), + channel: channel, + } + return &scanner +} + +func (s *YouTubeAPIChannelScanner) Scan(sinceTimestamp int64) <-chan SourceScanIteratorResult { + videoCh := make(chan SourceScanIteratorResult, 10) + go func() { + defer close(videoCh) + + for firstRun, nextPage := true, ""; firstRun || nextPage != ""; { + var videos []SourceVideo + var err error + + firstRun = false + + videos, nextPage, err = s.api.GetChannelVideosPage(s.channel, sinceTimestamp, nextPage) + if err != nil { + videoCh <- SourceScanIteratorResult { + Video: nil, + Error: err, + } + return + } + + for _, video := range videos { + outVideo := video + videoCh <- SourceScanIteratorResult { + Video: &outVideo, + Error: nil, + } + } + } + }() + + return videoCh +} diff --git a/local/youtubeEnricher.go b/local/youtubeEnricher.go new file mode 100644 index 0000000..0c36d7b --- /dev/null +++ b/local/youtubeEnricher.go @@ -0,0 +1,74 @@ +package local + +import ( + "time" + + log "github.com/sirupsen/logrus" + + "github.com/lbryio/lbry.go/v2/extras/util" +) + +type YouTubeVideoEnricher interface { + EnrichMissing(source *SourceVideo) error +} + +type YouTubeAPIVideoEnricher struct { + api *YouTubeAPI +} + +func NewYouTubeAPIVideoEnricher(apiKey string) (*YouTubeAPIVideoEnricher) { + enricher := YouTubeAPIVideoEnricher{ + api: NewYouTubeAPI(apiKey), + } + return &enricher +} + +func (e *YouTubeAPIVideoEnricher) EnrichMissing(source *SourceVideo) error { + if source.ReleaseTime != nil { + log.Debugf("Video %s does not need enrichment. YouTubeAPIVideoEnricher is skipping.", source.ID) + return nil + } + + snippet, err := e.api.GetVideoSnippet(source.ID) + if err != nil { + log.Errorf("Error snippet data for video %s: %v", err) + return err + } + + publishedAt, err := time.Parse(time.RFC3339, snippet.PublishedAt) + if err != nil { + log.Errorf("Error converting publishedAt to timestamp: %v", err) + } else { + source.ReleaseTime = util.PtrToInt64(publishedAt.Unix()) + } + return nil +} + +type CacheVideoEnricher struct { + syncDB *SyncDb +} + +func NewCacheVideoEnricher(syncDB *SyncDb) *CacheVideoEnricher { + enricher := CacheVideoEnricher { + syncDB, + } + return &enricher +} + +func (e *CacheVideoEnricher) EnrichMissing(source *SourceVideo) error { + if source.ReleaseTime != nil { + log.Debugf("Video %s does not need enrichment. YouTubeAPIVideoEnricher is skipping.", source.ID) + return nil + } + + cached, err := e.syncDB.GetVideoRecord(source.Source, source.ID, false, false) + if err != nil { + log.Errorf("Error getting cached video %s: %v", source.ID, err) + return err + } + + if cached != nil && cached.ReleaseTime.Valid { + source.ReleaseTime = &cached.ReleaseTime.Int64 + } + return nil +} diff --git a/local/ytapi.go b/local/ytapi.go new file mode 100644 index 0000000..1427901 --- /dev/null +++ b/local/ytapi.go @@ -0,0 +1,157 @@ +package local + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + log "github.com/sirupsen/logrus" + + "github.com/lbryio/lbry.go/v2/extras/util" +) + +type YouTubeAPI struct { + apiKey string + client *http.Client +} + +func NewYouTubeAPI(apiKey string) (*YouTubeAPI) { + client := &http.Client { + Transport: &http.Transport{ + MaxIdleConns: 10, + IdleConnTimeout: 30 * time.Second, + DisableCompression: true, + }, + } + + api := YouTubeAPI { + apiKey: apiKey, + client: client, + } + + return &api +} + +func (a *YouTubeAPI) GetVideoSnippet(videoID string) (*VideoSnippet, error) { + req, err := http.NewRequest("GET", "https://youtube.googleapis.com/youtube/v3/videos", nil) + if err != nil { + log.Errorf("Error creating http client for YouTube API: %v", err) + return nil, err + } + + query := req.URL.Query() + query.Add("part", "snippet") + query.Add("id", videoID) + query.Add("key", a.apiKey) + req.URL.RawQuery = query.Encode() + + req.Header.Add("Accept", "application/json") + + resp, err := a.client.Do(req) + defer resp.Body.Close() + if err != nil { + log.Errorf("Error from YouTube API: %v", err) + return nil, err + } + + body, err := io.ReadAll(resp.Body) + log.Tracef("Response from YouTube API: %s", string(body[:])) + + var result videoListResponse + err = json.Unmarshal(body, &result) + if err != nil { + log.Errorf("Error deserializing video list response from YouTube API: %v", err) + return nil, err + } + + if len(result.Items) != 1 { + err = fmt.Errorf("YouTube API responded with incorrect number of snippets (%d) while attempting to get snippet data for video %s", len(result.Items), videoID) + return nil, err + } + + return &result.Items[0].Snippet, nil +} + +func (a *YouTubeAPI) GetChannelVideosPage(channelID string, publishedAfter int64, pageToken string) ([]SourceVideo, string, error) { + req, err := http.NewRequest("GET", "https://youtube.googleapis.com/youtube/v3/search", nil) + if err != nil { + log.Errorf("Error creating http client for YouTube API: %v", err) + return []SourceVideo{}, "", err + } + + query := req.URL.Query() + query.Add("part", "snippet") + query.Add("type", "video") + query.Add("channelId", channelID) + query.Add("publishedAfter", time.Unix(publishedAfter, 0).Format(time.RFC3339)) + query.Add("maxResults", "5") + if pageToken != "" { + query.Add("pageToken", pageToken) + } + query.Add("key", a.apiKey) + req.URL.RawQuery = query.Encode() + + req.Header.Add("Accept", "application/json") + + resp, err := a.client.Do(req) + defer resp.Body.Close() + if err != nil { + log.Errorf("Error from YouTube API: %v", err) + return []SourceVideo{}, "", err + } + + body, err := io.ReadAll(resp.Body) + log.Tracef("Response from YouTube API: %s", string(body[:])) + + var result videoSearchResponse + err = json.Unmarshal(body, &result) + if err != nil { + log.Errorf("Error deserializing video list response from YouTube API: %v", err) + return []SourceVideo{}, "", err + } + + videos := []SourceVideo{} + for _, item := range result.Items { + var releaseTime *int64 + publishedAt, err := time.Parse(time.RFC3339, item.Snippet.PublishedAt) + if err != nil { + log.Errorf("Unable to parse publish time of %s while scanning YouTube channel %s: %v", item.ID.VideoID, channelID) + releaseTime = nil + } else { + releaseTime = util.PtrToInt64(publishedAt.Unix()) + } + + video := SourceVideo { + ID: item.ID.VideoID, + Source: "YouTube", + ReleaseTime: releaseTime, + } + videos = append(videos, video) + } + + return videos, result.NextPageToken, nil +} + +type videoListResponse struct { + NextPageToken string `json:"nextPageToken"` + Items []struct { + ID string `json:"id"` + Snippet VideoSnippet `json:"snippet"` + } `json:"items"` +} + +type videoSearchResponse struct { + NextPageToken string `json:"nextPageToken"` + Items []struct { + ID struct{ + VideoID string `json:"videoId"` + } `json:"id"` + Snippet VideoSnippet `json:"snippet"` + } `json:"items"` +} + +type VideoSnippet struct { + PublishedAt string `json:"publishedAt"` +} diff --git a/local/ytdl.go b/local/ytdl.go new file mode 100644 index 0000000..7baa893 --- /dev/null +++ b/local/ytdl.go @@ -0,0 +1,239 @@ +package local + +import ( + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "os" + "os/exec" + "path" + "strings" + + log "github.com/sirupsen/logrus" + + "github.com/lbryio/ytsync/v5/downloader/ytdl" +) + +type Ytdl struct { + DownloadDir string +} + +func NewYtdl(downloadDir string) (*Ytdl, error) { + // TODO validate download dir + + y := Ytdl { + DownloadDir: downloadDir, + } + + return &y, nil +} + +func (y *Ytdl) GetVideoMetadata(videoID string) (*ytdl.YtdlVideo, error) { + metadataPath, err := y.GetVideoMetadataFile(videoID) + if err != nil { + return nil, err + } + + metadataBytes, err := os.ReadFile(metadataPath) + if err != nil { + return nil, err + } + + var metadata *ytdl.YtdlVideo + err = json.Unmarshal(metadataBytes, &metadata) + if err != nil { + return nil, err + } + + return metadata, nil +} + + +func (y *Ytdl) GetVideoMetadataFile(videoID string) (string, error) { + basePath := path.Join(y.DownloadDir, videoID) + metadataPath := basePath + ".info.json" + + _, err := os.Stat(metadataPath) + if err != nil && !os.IsNotExist(err) { + log.Errorf("Error determining if video metadata already exists: %v", err) + return "", err + } else if err != nil { + log.Debugf("Metadata file for video %s does not exist. Downloading now.", videoID) + err = downloadVideoMetadata(basePath, videoID) + if err != nil { + return "", err + } + } + + return metadataPath, nil +} + +func (y *Ytdl) GetVideoFile(videoID string) (string, error) { + videoPath, err := findDownloadedVideo(y.DownloadDir, videoID) + if err != nil { + return "", err + } + + if videoPath != nil { + return *videoPath, nil + } + + basePath := path.Join(y.DownloadDir, videoID) + metadataPath, err := y.GetVideoMetadataFile(videoID) + if err != nil { + log.Errorf("Error getting metadata path in preparation for video download: %v", err) + return "", err + } + err = downloadVideo(basePath, metadataPath) + if err != nil { + return "", nil + } + + videoPath, err = findDownloadedVideo(y.DownloadDir, videoID) + if err != nil { + log.Errorf("Error from findDownloadedVideo() after already succeeding once: %v", err) + return "", err + } + if videoPath == nil { + return "", errors.New("Could not find a downloaded video after successful download.") + } + + return *videoPath, nil +} + +func (y *Ytdl) DeleteVideoFiles(videoID string) error { + files, err := ioutil.ReadDir(y.DownloadDir) + if err != nil { + return err + } + + for _, f := range files { + if f.IsDir() { + continue + } + if strings.Contains(f.Name(), videoID) { + videoPath := path.Join(y.DownloadDir, f.Name()) + err = os.Remove(videoPath) + if err != nil { + log.Errorf("Error while deleting file %s: %v", y.DownloadDir, err) + return err + } + } + } + + return nil +} + +func deleteFile(path string) error { + _, err := os.Stat(path) + if err != nil && !os.IsNotExist(err) { + log.Errorf("Error determining if file %s exists: %v", path, err) + return err + } else if err != nil { + log.Debugf("File %s does not exist. Skipping deletion.", path) + return nil + } + + return os.Remove(path) +} + +func findDownloadedVideo(videoDir, videoID string) (*string, error) { + files, err := ioutil.ReadDir(videoDir) + if err != nil { + return nil, err + } + + for _, f := range files { + if f.IsDir() { + continue + } + if path.Ext(f.Name()) == ".mp4" && strings.Contains(f.Name(), videoID) { + videoPath := path.Join(videoDir, f.Name()) + return &videoPath, nil + } + } + return nil, nil +} + +func downloadVideoMetadata(basePath, videoID string) error { + ytdlArgs := []string{ + "--skip-download", + "--write-info-json", + "--force-overwrites", + fmt.Sprintf("https://www.youtube.com/watch?v=%s", videoID), + "--cookies", + "cookies.txt", + "-o", + basePath, + } + ytdlCmd := exec.Command("yt-dlp", ytdlArgs...) + output, err := runCmd(ytdlCmd) + log.Debug(output) + return err +} + +func downloadVideo(basePath, metadataPath string) error { + ytdlArgs := []string{ + "--no-progress", + "-o", + basePath, + "--merge-output-format", + "mp4", + "--postprocessor-args", + "ffmpeg:-movflags faststart", + "--abort-on-unavailable-fragment", + "--fragment-retries", + "1", + "--cookies", + "cookies.txt", + "--extractor-args", + "youtube:player_client=android", + "--load-info-json", + metadataPath, + "-fbestvideo[ext=mp4][vcodec!*=av01][height<=720]+bestaudio[ext!=webm][format_id!=258][format_id!=251][format_id!=256][format_id!=327]", + } + + ytdlCmd := exec.Command("yt-dlp", ytdlArgs...) + output, err := runCmd(ytdlCmd) + log.Debug(output) + return err +} + +func runCmd(cmd *exec.Cmd) ([]string, error) { + log.Infof("running cmd: %s", strings.Join(cmd.Args, " ")) + var err error + stderr, err := cmd.StderrPipe() + if err != nil { + return nil, err + } + stdout, err := cmd.StdoutPipe() + if err != nil { + return nil, err + } + err = cmd.Start() + if err != nil { + return nil, err + } + outLog, err := ioutil.ReadAll(stdout) + if err != nil { + return nil, err + } + errorLog, err := ioutil.ReadAll(stderr) + if err != nil { + return nil, err + } + done := make(chan error, 1) + go func() { + done <- cmd.Wait() + }() + + select { + case err := <-done: + if err != nil { + log.Error(string(errorLog)) + return nil, err + } + return strings.Split(strings.Replace(string(outLog), "\r\n", "\n", -1), "\n"), nil + } +} diff --git a/local/ytdlVideoSource.go b/local/ytdlVideoSource.go new file mode 100644 index 0000000..2fd2a56 --- /dev/null +++ b/local/ytdlVideoSource.go @@ -0,0 +1,101 @@ +package local + +import ( + log "github.com/sirupsen/logrus" + + "github.com/lbryio/ytsync/v5/downloader/ytdl" +) + +type YtdlVideoSource struct { + downloader Ytdl + channelScanner YouTubeChannelScanner + enrichers []YouTubeVideoEnricher +} + +func NewYtdlVideoSource(downloadDir string, config *YouTubeSourceConfig, syncDB *SyncDb) (*YtdlVideoSource, error) { + ytdl, err := NewYtdl(downloadDir) + if err != nil { + return nil, err + } + + source := YtdlVideoSource { + downloader: *ytdl, + } + + if syncDB != nil { + source.enrichers = append(source.enrichers, NewCacheVideoEnricher(syncDB)) + } + + if config.APIKey != "" { + ytapiEnricher := NewYouTubeAPIVideoEnricher(config.APIKey) + source.enrichers = append(source.enrichers, ytapiEnricher) + source.channelScanner = NewYouTubeAPIChannelScanner(config.APIKey, config.ChannelID) + } + + if source.channelScanner == nil { + log.Warnf("No means of scanning source channels has been provided") + } + + return &source, nil +} + +func (s *YtdlVideoSource) SourceName() string { + return "YouTube" +} + +func (s *YtdlVideoSource) GetVideo(id string) (*SourceVideo, error) { + metadata, err := s.downloader.GetVideoMetadata(id) + if err != nil { + return nil, err + } + + videoPath, err := s.downloader.GetVideoFile(id) + if err != nil { + return nil, err + } + + var bestThumbnail *ytdl.Thumbnail = nil + for i, thumbnail := range metadata.Thumbnails { + if i == 0 || bestThumbnail.Width < thumbnail.Width { + bestThumbnail = &thumbnail + } + } + + sourceVideo := SourceVideo { + ID: id, + Source: "YouTube", + Title: &metadata.Title, + Description: &metadata.Description, + SourceURL: "\nhttps://www.youtube.com/watch?v=" + id, + Languages: []string{}, + Tags: metadata.Tags, + ReleaseTime: nil, + ThumbnailURL: &bestThumbnail.URL, + FullLocalPath: &videoPath, + } + + for _, enricher := range s.enrichers { + err = enricher.EnrichMissing(&sourceVideo) + if err != nil { + log.Warnf("Error enriching video %s, continuing enrichment: %v", id, err) + } + } + + log.Debugf("Source video retrieved via ytdl: %v", sourceVideo) + + return &sourceVideo, nil +} + +func (s *YtdlVideoSource) DeleteLocalCache(id string) error { + return s.downloader.DeleteVideoFiles(id) +} + +func (s *YtdlVideoSource) Scan(sinceTimestamp int64) <-chan SourceScanIteratorResult { + if s.channelScanner != nil { + return s.channelScanner.Scan(sinceTimestamp) + } + + videoCh := make(chan SourceScanIteratorResult, 1) + close(videoCh) + return videoCh +}