diff --git a/0001-lbry-patch.patch b/0001-lbry-patch.patch deleted file mode 100644 index 603a3ab..0000000 --- a/0001-lbry-patch.patch +++ /dev/null @@ -1,111 +0,0 @@ -From 30380338ba9af01696c94b61f0597131638eaec1 Mon Sep 17 00:00:00 2001 -From: Niko Storni <niko@lbry.io> -Date: Mon, 16 Dec 2019 00:13:36 +0100 -Subject: [PATCH] lbry-patch - ---- - youtube_dl/extractor/youtube.py | 45 +++++++++++++++++++++++++-------- - 1 file changed, 35 insertions(+), 10 deletions(-) - -diff --git a/youtube_dl/extractor/youtube.py b/youtube_dl/extractor/youtube.py -index b913d07a6..cd66a5b01 100644 ---- a/youtube_dl/extractor/youtube.py -+++ b/youtube_dl/extractor/youtube.py -@@ -10,6 +10,7 @@ import random - import re - import time - import traceback -+import subprocess - - from .common import InfoExtractor, SearchInfoExtractor - from ..jsinterp import JSInterpreter -@@ -536,6 +537,9 @@ class YoutubeIE(YoutubeBaseInfoExtractor): - - _GEO_BYPASS = False - -+ _WGET_429_RATE_LIMIT = 8191 -+ _WGET_BINARY = "wget" -+ - IE_NAME = 'youtube' - _TESTS = [ - { -@@ -1254,6 +1258,17 @@ class YoutubeIE(YoutubeBaseInfoExtractor): - """ Return a string representation of a signature """ - return '.'.join(compat_str(len(part)) for part in example_sig.split('.')) - -+ def _rate_limit_download(self, url, video_id, note=None): -+ if note is None: -+ self.report_download_webpage(video_id) -+ elif note is not False: -+ if video_id is None: -+ self.to_screen('%s' % (note,)) -+ else: -+ self.to_screen('%s: %s' % (video_id, note)) -+ source_address = self._downloader.params.get('source_address') -+ return subprocess.run([self._WGET_BINARY, '-q', '--limit-rate', str(self._WGET_429_RATE_LIMIT), '--bind-address', source_address, '-O', '-', url], check=True, stdout=subprocess.PIPE).stdout.decode(encoding='UTF-8') -+ - def _extract_signature_function(self, video_id, player_url, example_sig): - id_m = re.match( - r'.*?-(?P<id>[a-zA-Z0-9_-]+)(?:/watch_as3|/html5player(?:-new)?|(?:/[a-z]{2,3}_[A-Z]{2})?/base)?\.(?P<ext>[a-z]+)$', -@@ -1678,7 +1693,7 @@ class YoutubeIE(YoutubeBaseInfoExtractor): - - # Get video webpage - url = proto + '://www.youtube.com/watch?v=%s&gl=US&hl=en&has_verified=1&bpctr=9999999999' % video_id -- video_webpage = self._download_webpage(url, video_id) -+ video_webpage = self._rate_limit_download(url, video_id) - - # Attempt to extract SWF player URL - mobj = re.search(r'swfConfig.*?"(https?:\\/\\/.*?watch.*?-.*?\.swf)"', video_webpage) -@@ -1736,10 +1751,9 @@ class YoutubeIE(YoutubeBaseInfoExtractor): - r'"sts"\s*:\s*(\d+)', embed_webpage, 'sts', default=''), - }) - video_info_url = proto + '://www.youtube.com/get_video_info?' + data -- video_info_webpage = self._download_webpage( -+ video_info_webpage = self._rate_limit_download( - video_info_url, video_id, -- note='Refetching age-gated info webpage', -- errnote='unable to download video info webpage') -+ note='Refetching age-gated info webpage') - video_info = compat_parse_qs(video_info_webpage) - pl_response = video_info.get('player_response', [None])[0] - player_response = extract_player_response(pl_response, video_id) -@@ -1777,7 +1791,7 @@ class YoutubeIE(YoutubeBaseInfoExtractor): - # The general idea is to take a union of itags of both DASH manifests (for example - # video with such 'manifest behavior' see https://github.com/ytdl-org/youtube-dl/issues/6093) - self.report_video_info_webpage_download(video_id) -- for el in ('embedded', 'detailpage', 'vevo', ''): -+ for el in ('', 'embedded', 'detailpage', 'vevo'): - query = { - 'video_id': video_id, - 'ps': 'default', -@@ -1789,11 +1803,22 @@ class YoutubeIE(YoutubeBaseInfoExtractor): - query['el'] = el - if sts: - query['sts'] = sts -- video_info_webpage = self._download_webpage( -- '%s://www.youtube.com/get_video_info' % proto, -- video_id, note=False, -- errnote='unable to download video info webpage', -- fatal=False, query=query) -+ -+ if el == '': -+ base_url = 'https://youtube.com/get_video_info?video_id={}'.format(video_id) -+ else: -+ base_url = 'https://youtube.com/get_video_info' -+ -+ for q in query: -+ if q is None or q is "": -+ continue -+ if query[q] is None or query[q] is "": -+ continue -+ -+ base_url = base_url + "?{}={}".format(q, query[q]) -+ -+ video_info_webpage = self._rate_limit_download(base_url, video_id) -+ - if not video_info_webpage: - continue - get_video_info = compat_parse_qs(video_info_webpage) --- -2.17.1 - diff --git a/cmd/local.go b/cmd/local.go new file mode 100644 index 0000000..a8e4203 --- /dev/null +++ b/cmd/local.go @@ -0,0 +1,7 @@ +package cmd + +import "github.com/lbryio/ytsync/v5/local" + +func init() { + local.AddCommand(rootCmd) +} diff --git a/cmd/root.go b/cmd/root.go new file mode 100644 index 0000000..92af824 --- /dev/null +++ b/cmd/root.go @@ -0,0 +1,166 @@ +package cmd + +import ( + "os" + "time" + + "github.com/lbryio/ytsync/v5/manager" + "github.com/lbryio/ytsync/v5/sdk" + "github.com/lbryio/ytsync/v5/shared" + ytUtils "github.com/lbryio/ytsync/v5/util" + + "github.com/lbryio/lbry.go/v2/extras/errors" + "github.com/lbryio/lbry.go/v2/extras/util" + + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" +) + +const defaultMaxTries = 3 + +var ( + cliFlags shared.SyncFlags + maxVideoLength int +) + +var rootCmd = &cobra.Command{ + Use: "ytsync", + Short: "Publish youtube channels into LBRY network automatically.", + Run: ytSync, + Args: cobra.RangeArgs(0, 0), +} + +func init() { + rootCmd.Flags().IntVar(&cliFlags.MaxTries, "max-tries", defaultMaxTries, "Number of times to try a publish that fails") + rootCmd.Flags().BoolVar(&cliFlags.TakeOverExistingChannel, "takeover-existing-channel", false, "If channel exists and we don't own it, take over the channel") + rootCmd.Flags().IntVar(&cliFlags.Limit, "limit", 0, "limit the amount of channels to sync") + rootCmd.Flags().BoolVar(&cliFlags.SkipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup") + rootCmd.Flags().BoolVar(&cliFlags.SyncUpdate, "update", false, "Update previously synced channels instead of syncing new ones") + rootCmd.Flags().BoolVar(&cliFlags.SingleRun, "run-once", false, "Whether the process should be stopped after one cycle or not") + rootCmd.Flags().BoolVar(&cliFlags.RemoveDBUnpublished, "remove-db-unpublished", false, "Remove videos from the database that are marked as published but aren't really published") + rootCmd.Flags().BoolVar(&cliFlags.UpgradeMetadata, "upgrade-metadata", false, "Upgrade videos if they're on the old metadata version") + rootCmd.Flags().BoolVar(&cliFlags.DisableTransfers, "no-transfers", false, "Skips the transferring process of videos, channels and supports") + rootCmd.Flags().BoolVar(&cliFlags.QuickSync, "quick", false, "Look up only the last 50 videos from youtube") + rootCmd.Flags().StringVar(&cliFlags.Status, "status", "", "Specify which queue to pull from. Overrides --update") + rootCmd.Flags().StringVar(&cliFlags.SecondaryStatus, "status2", "", "Specify which secondary queue to pull from.") + rootCmd.Flags().StringVar(&cliFlags.ChannelID, "channelID", "", "If specified, only this channel will be synced.") + rootCmd.Flags().Int64Var(&cliFlags.SyncFrom, "after", time.Unix(0, 0).Unix(), "Specify from when to pull jobs [Unix time](Default: 0)") + rootCmd.Flags().Int64Var(&cliFlags.SyncUntil, "before", time.Now().AddDate(1, 0, 0).Unix(), "Specify until when to pull jobs [Unix time](Default: current Unix time)") + rootCmd.Flags().IntVar(&cliFlags.ConcurrentJobs, "concurrent-jobs", 1, "how many jobs to process concurrently") + rootCmd.Flags().IntVar(&cliFlags.VideosLimit, "videos-limit", 0, "how many videos to process per channel (leave 0 for automatic detection)") + rootCmd.Flags().IntVar(&cliFlags.MaxVideoSize, "max-size", 2048, "Maximum video size to process (in MB)") + rootCmd.Flags().IntVar(&maxVideoLength, "max-length", 2, "Maximum video length to process (in hours)") +} + +func Execute() { + err := rootCmd.Execute() + if err != nil { + log.Errorln(err) + os.Exit(1) + } +} + +func ytSync(cmd *cobra.Command, args []string) { + var hostname string + slackToken := os.Getenv("SLACK_TOKEN") + if slackToken == "" { + log.Error("A slack token was not present in env vars! Slack messages disabled!") + } else { + var err error + hostname, err = os.Hostname() + if err != nil { + log.Error("could not detect system hostname") + hostname = "ytsync-unknown" + } + if len(hostname) > 30 { + hostname = hostname[0:30] + } + + util.InitSlack(os.Getenv("SLACK_TOKEN"), os.Getenv("SLACK_CHANNEL"), hostname) + } + + if cliFlags.Status != "" && !util.InSlice(cliFlags.Status, shared.SyncStatuses) { + log.Errorf("status must be one of the following: %v\n", shared.SyncStatuses) + return + } + + if cliFlags.MaxTries < 1 { + log.Errorln("setting --max-tries less than 1 doesn't make sense") + return + } + + if cliFlags.Limit < 0 { + log.Errorln("setting --limit less than 0 (unlimited) doesn't make sense") + return + } + cliFlags.MaxVideoLength = time.Duration(maxVideoLength) * time.Hour + + apiURL := os.Getenv("LBRY_WEB_API") + apiToken := os.Getenv("LBRY_API_TOKEN") + youtubeAPIKey := os.Getenv("YOUTUBE_API_KEY") + lbrycrdDsn := os.Getenv("LBRYCRD_STRING") + awsS3ID := os.Getenv("AWS_S3_ID") + awsS3Secret := os.Getenv("AWS_S3_SECRET") + awsS3Region := os.Getenv("AWS_S3_REGION") + awsS3Bucket := os.Getenv("AWS_S3_BUCKET") + if apiURL == "" { + log.Errorln("An API URL was not defined. Please set the environment variable LBRY_WEB_API") + return + } + if apiToken == "" { + log.Errorln("An API Token was not defined. Please set the environment variable LBRY_API_TOKEN") + return + } + if youtubeAPIKey == "" { + log.Errorln("A Youtube API key was not defined. Please set the environment variable YOUTUBE_API_KEY") + return + } + if awsS3ID == "" { + log.Errorln("AWS S3 ID credentials were not defined. Please set the environment variable AWS_S3_ID") + return + } + if awsS3Secret == "" { + log.Errorln("AWS S3 Secret credentials were not defined. Please set the environment variable AWS_S3_SECRET") + return + } + if awsS3Region == "" { + log.Errorln("AWS S3 Region was not defined. Please set the environment variable AWS_S3_REGION") + return + } + if awsS3Bucket == "" { + log.Errorln("AWS S3 Bucket was not defined. Please set the environment variable AWS_S3_BUCKET") + return + } + if lbrycrdDsn == "" { + log.Infoln("Using default (local) lbrycrd instance. Set LBRYCRD_STRING if you want to use something else") + } + + blobsDir := ytUtils.GetBlobsDir() + + apiConfig := &sdk.APIConfig{ + YoutubeAPIKey: youtubeAPIKey, + ApiURL: apiURL, + ApiToken: apiToken, + HostName: hostname, + } + awsConfig := &shared.AwsConfigs{ + AwsS3ID: awsS3ID, + AwsS3Secret: awsS3Secret, + AwsS3Region: awsS3Region, + AwsS3Bucket: awsS3Bucket, + } + sm := manager.NewSyncManager( + cliFlags, + blobsDir, + lbrycrdDsn, + awsConfig, + apiConfig, + ) + + err := sm.Start() + if err != nil { + ytUtils.SendErrorToSlack(errors.FullTrace(err)) + } + + ytUtils.SendInfoToSlack("Syncing process terminated!") +} diff --git a/local/local.go b/local/local.go new file mode 100644 index 0000000..9b7837e --- /dev/null +++ b/local/local.go @@ -0,0 +1,253 @@ +package local + +import ( + "errors" + "os" + "regexp" + "strings" + "time" + + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "github.com/abadojack/whatlanggo" + + "github.com/lbryio/lbry.go/v2/extras/util" + "github.com/lbryio/ytsync/v5/namer" + "github.com/lbryio/ytsync/v5/tags_manager" +) + +type SyncContext struct { + DryRun bool + KeepCache bool + ReflectStreams bool + TempDir string + LbrynetAddr string + ChannelID string + PublishBid float64 + YouTubeSourceConfig *YouTubeSourceConfig +} + +func (c *SyncContext) Validate() error { + if c.TempDir == "" { + return errors.New("No TempDir provided") + } + if c.LbrynetAddr == "" { + return errors.New("No Lbrynet address provided") + } + if c.ChannelID == "" { + return errors.New("No channel ID provided") + } + if c.PublishBid <= 0.0 { + return errors.New("Publish bid is not greater than zero") + } + return nil +} + +type YouTubeSourceConfig struct { + YouTubeAPIKey string +} + +var syncContext SyncContext + +func AddCommand(rootCmd *cobra.Command) { + cmd := &cobra.Command{ + Use: "local", + Short: "run a personal ytsync", + Run: localCmd, + Args: cobra.ExactArgs(1), + } + cmd.Flags().BoolVar(&syncContext.DryRun, "dry-run", false, "Display information about the stream publishing, but do not publish the stream") + cmd.Flags().BoolVar(&syncContext.KeepCache, "keep-cache", false, "Don't delete local files after publishing.") + cmd.Flags().BoolVar(&syncContext.ReflectStreams, "reflect-streams", true, "Require published streams to be reflected.") + cmd.Flags().StringVar(&syncContext.TempDir, "temp-dir", getEnvDefault("TEMP_DIR", ""), "directory to use for temporary files") + cmd.Flags().Float64Var(&syncContext.PublishBid, "publish-bid", 0.01, "Bid amount for the stream claim") + cmd.Flags().StringVar(&syncContext.LbrynetAddr, "lbrynet-address", getEnvDefault("LBRYNET_ADDRESS", ""), "JSONRPC address of the local LBRYNet daemon") + cmd.Flags().StringVar(&syncContext.ChannelID, "channel-id", "", "LBRY channel ID to publish to") + + // For now, assume source is always YouTube + syncContext.YouTubeSourceConfig = &YouTubeSourceConfig{} + cmd.Flags().StringVar(&syncContext.YouTubeSourceConfig.YouTubeAPIKey, "youtube-api-key", getEnvDefault("YOUTUBE_API_KEY", ""), "YouTube API Key") + rootCmd.AddCommand(cmd) +} + +func getEnvDefault(key, defaultValue string) string { + if value, ok := os.LookupEnv(key); ok { + return value + } + return defaultValue +} + +func localCmd(cmd *cobra.Command, args []string) { + err := syncContext.Validate() + if err != nil { + log.Error(err) + return + } + videoID := args[0] + + log.Debugf("Running sync for video ID %s", videoID) + + var publisher VideoPublisher + publisher, err = NewLocalSDKPublisher(syncContext.LbrynetAddr, syncContext.ChannelID, syncContext.PublishBid) + if err != nil { + log.Errorf("Error setting up publisher: %v", err) + return + } + + var videoSource VideoSource + if syncContext.YouTubeSourceConfig != nil { + videoSource, err = NewYtdlVideoSource(syncContext.TempDir, syncContext.YouTubeSourceConfig) + if err != nil { + log.Errorf("Error setting up video source: %v", err) + return + } + } + + sourceVideo, err := videoSource.GetVideo(videoID) + if err != nil { + log.Errorf("Error getting source video: %v", err) + return + } + + processedVideo, err := processVideoForPublishing(*sourceVideo, syncContext.ChannelID) + if err != nil { + log.Errorf("Error processing source video for publishing: %v", err) + return + } + + if syncContext.DryRun { + log.Infoln("This is a dry run. Nothing will be published.") + log.Infof("The local file %s would be published to channel ID %s as %s.", processedVideo.FullLocalPath, syncContext.ChannelID, processedVideo.ClaimName) + log.Debugf("Object to be published: %v", processedVideo) + + } else { + doneReflectingCh, err := publisher.Publish(*processedVideo, syncContext.ReflectStreams) + if err != nil { + log.Errorf("Error publishing video: %v", err) + return + } + + if syncContext.ReflectStreams { + err = <-doneReflectingCh + if err != nil { + log.Errorf("Error while wating for stream to reflect: %v", err) + } + } else { + log.Debugln("Not waiting for stream to reflect.") + } + } + + if !syncContext.KeepCache { + log.Infof("Deleting local files.") + err = videoSource.DeleteLocalCache(videoID) + if err != nil { + log.Errorf("Error deleting local files for video %s: %v", videoID, err) + } + } + log.Info("Done") +} + +type SourceVideo struct { + ID string + Title *string + Description *string + SourceURL string + Languages []string + Tags []string + ReleaseTime *int64 + ThumbnailURL *string + FullLocalPath string +} + +type PublishableVideo struct { + ID string + ClaimName string + Title string + Description string + SourceURL string + Languages []string + Tags []string + ReleaseTime int64 + ThumbnailURL string + FullLocalPath string +} + +func processVideoForPublishing(source SourceVideo, channelID string) (*PublishableVideo, error) { + tags, err := tags_manager.SanitizeTags(source.Tags, channelID) + if err != nil { + log.Errorf("Error sanitizing tags: %v", err) + return nil, err + } + + descriptionSample := "" + if source.Description != nil { + urlsRegex := regexp.MustCompile(`(?m) ?(f|ht)(tp)(s?)(://)(.*)[.|/](.*)`) + descriptionSample = urlsRegex.ReplaceAllString(*source.Description, "") + } + info := whatlanggo.Detect(descriptionSample) + + title := "" + if source.Title != nil { + title = *source.Title + } + info2 := whatlanggo.Detect(title) + var languages []string = nil + if info.IsReliable() && info.Lang.Iso6391() != "" { + language := info.Lang.Iso6391() + languages = []string{language} + } else if info2.IsReliable() && info2.Lang.Iso6391() != "" { + language := info2.Lang.Iso6391() + languages = []string{language} + } + + claimName := namer.NewNamer().GetNextName(title) + + thumbnailURL := source.ThumbnailURL + if thumbnailURL == nil { + thumbnailURL = util.PtrToString("") + } + + releaseTime := source.ReleaseTime + if releaseTime == nil { + releaseTime = util.PtrToInt64(time.Now().Unix()) + } + + processed := PublishableVideo { + ClaimName: claimName, + Title: title, + Description: getAbbrevDescription(source), + Languages: languages, + Tags: tags, + ReleaseTime: *releaseTime, + ThumbnailURL: *thumbnailURL, + FullLocalPath: source.FullLocalPath, + } + + log.Debugf("Video prepared for publication: %v", processed) + + return &processed, nil +} + +func getAbbrevDescription(v SourceVideo) string { + if v.Description == nil { + return v.SourceURL + } + + additionalDescription := "\n...\n" + v.SourceURL + maxLength := 2800 - len(additionalDescription) + + description := strings.TrimSpace(*v.Description) + if len(description) > maxLength { + description = description[:maxLength] + } + return description + additionalDescription +} + +type VideoSource interface { + GetVideo(id string) (*SourceVideo, error) + DeleteLocalCache(id string) error +} + +type VideoPublisher interface { + Publish(video PublishableVideo, reflectStream bool) (chan error, error) +} diff --git a/local/localSDKPublisher.go b/local/localSDKPublisher.go new file mode 100644 index 0000000..3886eb9 --- /dev/null +++ b/local/localSDKPublisher.go @@ -0,0 +1,125 @@ +package local + +import ( + "errors" + "sort" + "time" + + log "github.com/sirupsen/logrus" + + "github.com/lbryio/lbry.go/v2/extras/jsonrpc" + "github.com/lbryio/lbry.go/v2/extras/util" +) + +type LocalSDKPublisher struct { + channelID string + publishBid float64 + lbrynet *jsonrpc.Client +} + +func NewLocalSDKPublisher(sdkAddr, channelID string, publishBid float64) (*LocalSDKPublisher, error) { + lbrynet := jsonrpc.NewClient(sdkAddr) + lbrynet.SetRPCTimeout(5 * time.Minute) + + status, err := lbrynet.Status() + if err != nil { + return nil, err + } + + if !status.IsRunning { + return nil, errors.New("SDK is not running") + } + + // Should check to see if the SDK owns the channel + + // Should check to see if wallet is unlocked + // but jsonrpc.Client doesn't have WalletStatus method + // so skip for now + + // Should check to see if streams are configured to be reflected and warn if not + // but jsonrpc.Client doesn't have SettingsGet method to see if streams are reflected + // so use File.UploadingToReflector as a proxy for now + + publisher := LocalSDKPublisher { + channelID: channelID, + publishBid: publishBid, + lbrynet: lbrynet, + } + return &publisher, nil +} + +func (p *LocalSDKPublisher) Publish(video PublishableVideo, reflectStream bool) (chan error, error) { + streamCreateOptions := jsonrpc.StreamCreateOptions { + ClaimCreateOptions: jsonrpc.ClaimCreateOptions { + Title: &video.Title, + Description: &video.Description, + Languages: video.Languages, + ThumbnailURL: &video.ThumbnailURL, + Tags: video.Tags, + }, + ReleaseTime: &video.ReleaseTime, + ChannelID: &p.channelID, + License: util.PtrToString("Copyrighted (contact publisher)"), + } + + txSummary, err := p.lbrynet.StreamCreate(video.ClaimName, video.FullLocalPath, p.publishBid, streamCreateOptions) + if err != nil { + return nil, err + } + + if !reflectStream { + return nil, nil + } + + done := make(chan error, 1) + go func() { + for { + fileListResponse, fileIndex, err := findFileByTxid(p.lbrynet, txSummary.Txid) + if err != nil { + log.Errorf("Error finding file by txid: %v", err) + done <- err + return + } + if fileListResponse == nil { + log.Errorf("Could not find file in list with correct txid") + done <- err + return + } + + fileStatus := fileListResponse.Items[fileIndex] + if fileStatus.IsFullyReflected { + log.Info("Stream is fully reflected") + break + } + if !fileStatus.UploadingToReflector { + log.Error("Stream is not being uploaded to a reflector. Check your lbrynet settings if this is a mistake.") + done <- errors.New("Stream is not being reflected (check lbrynet settings).") + return + } + log.Infof("Stream reflector progress: %d%%", fileStatus.ReflectorProgress) + time.Sleep(5 * time.Second) + } + done <- nil + }() + + return done, nil +} + +// if jsonrpc.Client.FileList is extended to match the actual jsonrpc schema, this can be removed +func findFileByTxid(client *jsonrpc.Client, txid string) (*jsonrpc.FileListResponse, int, error) { + response, err := client.FileList(0, 20) + for { + if err != nil { + log.Errorf("Error getting file list page: %v", err) + return nil, 0, err + } + index := sort.Search(len(response.Items), func (i int) bool { return response.Items[i].Txid == txid }) + if index < len(response.Items) { + return response, index, nil + } + if response.Page >= response.TotalPages { + return nil, 0, nil + } + response, err = client.FileList(response.Page + 1, 20) + } +} diff --git a/local/readme.md b/local/readme.md new file mode 100644 index 0000000..81b0235 --- /dev/null +++ b/local/readme.md @@ -0,0 +1,69 @@ +# Running ytsync locally + +## Requirements + +- LBRY SDK (what do we actually need this for?) +- youtube-dl +- enough space to cache stuff +- YouTube data API key + + +## Process + +### Ensuring requirements are met + +- claim channel if there isn't one yet + - or easier, just error if no channel +- enough lbc in wallet? + +### Getting a YouTube API key + +To access the YouTube data API, you will first need some kind of google account. + +The API has two methods of authentication, OAuth2 and API keys. This application uses API keys. +These API keys are basically like passwords, and so once obtained, they should not be shared. + +The instructions for obtaining an API key are copied below from [here](https://developers.google.com/youtube/registering_an_application): + + +1. Open the [Credentials page](https://console.developers.google.com/apis/credentials) in the API Console. +2. Create an API key in the Console by clicking **Create credentials > API key**. You can restrict the key before using it in production by clicking **Restrict key** and selecting one of the **Restrictions**. + +To keep your API keys secure, follow the [best practices for securely using API keys](https://cloud.google.com/docs/authentication/api-keys). + +### Options to figure out what's already synced + +- simplest: assume nothing is synced yet +- assume everything before some video is synced +- get/put sync info from Odysee by proving you have private key for channel +- tag videos as having been synced from youtube so we can ensure accuracy +- hardest: scan channel and try to match up which videos are not synced yet + +### Central DB + +- prove you have a channel's private key to get info about that channel +- proper queue instead of sleeping for N minutes between syncs + + + +### Syncing a single video + +- downloading it +- thumbnails +- metadata +- having enough LBC for publish(es) +- automated error handling +- getting a human involved for errors that can't be handled automatically +- reflecting + +### Continuous Sync + +- running in background +- storing local state +- interactions with our central ytsync db +- dealing with yt throttling + + +### Debugging + +- dry-running the whole thing diff --git a/local/youtubeEnricher.go b/local/youtubeEnricher.go new file mode 100644 index 0000000..38369b5 --- /dev/null +++ b/local/youtubeEnricher.go @@ -0,0 +1,45 @@ +package local + +import ( + "time" + + log "github.com/sirupsen/logrus" + + "github.com/lbryio/lbry.go/v2/extras/util" +) + +type YouTubeVideoEnricher interface { + EnrichMissing(source *SourceVideo) error +} + +type YouTubeAPIVideoEnricher struct { + api *YouTubeAPI +} + +func NewYouTubeAPIVideoEnricher(apiKey string) (*YouTubeAPIVideoEnricher) { + enricher := YouTubeAPIVideoEnricher{ + api: NewYouTubeAPI(apiKey), + } + return &enricher +} + +func (e *YouTubeAPIVideoEnricher) EnrichMissing(source *SourceVideo) error { + if source.ReleaseTime != nil { + log.Debugf("Video %s does not need enrichment. YouTubeAPIVideoEnricher is skipping.", source.ID) + return nil + } + + snippet, err := e.api.GetVideoSnippet(source.ID) + if err != nil { + log.Errorf("Error snippet data for video %s: %v", err) + return err + } + + publishedAt, err := time.Parse(time.RFC3339, snippet.PublishedAt) + if err != nil { + log.Errorf("Error converting publishedAt to timestamp: %v", err) + } else { + source.ReleaseTime = util.PtrToInt64(publishedAt.Unix()) + } + return nil +} diff --git a/local/ytapi.go b/local/ytapi.go new file mode 100644 index 0000000..1924446 --- /dev/null +++ b/local/ytapi.go @@ -0,0 +1,83 @@ +package local + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + log "github.com/sirupsen/logrus" +) + +type YouTubeAPI struct { + apiKey string + client *http.Client +} + +func NewYouTubeAPI(apiKey string) (*YouTubeAPI) { + client := &http.Client { + Transport: &http.Transport{ + MaxIdleConns: 10, + IdleConnTimeout: 30 * time.Second, + DisableCompression: true, + }, + } + + api := YouTubeAPI { + apiKey: apiKey, + client: client, + } + + return &api +} + +func (a *YouTubeAPI) GetVideoSnippet(videoID string) (*VideoSnippet, error) { + req, err := http.NewRequest("GET", "https://youtube.googleapis.com/youtube/v3/videos", nil) + if err != nil { + log.Errorf("Error creating http client for YouTube API: %v", err) + return nil, err + } + + query := req.URL.Query() + query.Add("part", "snippet") + query.Add("id", videoID) + query.Add("key", a.apiKey) + req.URL.RawQuery = query.Encode() + + req.Header.Add("Accept", "application/json") + + resp, err := a.client.Do(req) + defer resp.Body.Close() + if err != nil { + log.Errorf("Error from YouTube API: %v", err) + return nil, err + } + + body, err := io.ReadAll(resp.Body) + log.Tracef("Response from YouTube API: %s", string(body[:])) + + var result videoListResponse + err = json.Unmarshal(body, &result) + if err != nil { + log.Errorf("Error deserializing video list response from YouTube API: %v", err) + return nil, err + } + + if len(result.Items) != 1 { + err = fmt.Errorf("YouTube API responded with incorrect number of snippets (%d) while attempting to get snippet data for video %s", len(result.Items), videoID) + return nil, err + } + + return &result.Items[0].Snippet, nil +} + +type videoListResponse struct { + Items []struct { + Snippet VideoSnippet `json:"snippet"` + } `json:"items"` +} + +type VideoSnippet struct { + PublishedAt string `json:"publishedAt"` +} diff --git a/local/ytdl.go b/local/ytdl.go new file mode 100644 index 0000000..7baa893 --- /dev/null +++ b/local/ytdl.go @@ -0,0 +1,239 @@ +package local + +import ( + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "os" + "os/exec" + "path" + "strings" + + log "github.com/sirupsen/logrus" + + "github.com/lbryio/ytsync/v5/downloader/ytdl" +) + +type Ytdl struct { + DownloadDir string +} + +func NewYtdl(downloadDir string) (*Ytdl, error) { + // TODO validate download dir + + y := Ytdl { + DownloadDir: downloadDir, + } + + return &y, nil +} + +func (y *Ytdl) GetVideoMetadata(videoID string) (*ytdl.YtdlVideo, error) { + metadataPath, err := y.GetVideoMetadataFile(videoID) + if err != nil { + return nil, err + } + + metadataBytes, err := os.ReadFile(metadataPath) + if err != nil { + return nil, err + } + + var metadata *ytdl.YtdlVideo + err = json.Unmarshal(metadataBytes, &metadata) + if err != nil { + return nil, err + } + + return metadata, nil +} + + +func (y *Ytdl) GetVideoMetadataFile(videoID string) (string, error) { + basePath := path.Join(y.DownloadDir, videoID) + metadataPath := basePath + ".info.json" + + _, err := os.Stat(metadataPath) + if err != nil && !os.IsNotExist(err) { + log.Errorf("Error determining if video metadata already exists: %v", err) + return "", err + } else if err != nil { + log.Debugf("Metadata file for video %s does not exist. Downloading now.", videoID) + err = downloadVideoMetadata(basePath, videoID) + if err != nil { + return "", err + } + } + + return metadataPath, nil +} + +func (y *Ytdl) GetVideoFile(videoID string) (string, error) { + videoPath, err := findDownloadedVideo(y.DownloadDir, videoID) + if err != nil { + return "", err + } + + if videoPath != nil { + return *videoPath, nil + } + + basePath := path.Join(y.DownloadDir, videoID) + metadataPath, err := y.GetVideoMetadataFile(videoID) + if err != nil { + log.Errorf("Error getting metadata path in preparation for video download: %v", err) + return "", err + } + err = downloadVideo(basePath, metadataPath) + if err != nil { + return "", nil + } + + videoPath, err = findDownloadedVideo(y.DownloadDir, videoID) + if err != nil { + log.Errorf("Error from findDownloadedVideo() after already succeeding once: %v", err) + return "", err + } + if videoPath == nil { + return "", errors.New("Could not find a downloaded video after successful download.") + } + + return *videoPath, nil +} + +func (y *Ytdl) DeleteVideoFiles(videoID string) error { + files, err := ioutil.ReadDir(y.DownloadDir) + if err != nil { + return err + } + + for _, f := range files { + if f.IsDir() { + continue + } + if strings.Contains(f.Name(), videoID) { + videoPath := path.Join(y.DownloadDir, f.Name()) + err = os.Remove(videoPath) + if err != nil { + log.Errorf("Error while deleting file %s: %v", y.DownloadDir, err) + return err + } + } + } + + return nil +} + +func deleteFile(path string) error { + _, err := os.Stat(path) + if err != nil && !os.IsNotExist(err) { + log.Errorf("Error determining if file %s exists: %v", path, err) + return err + } else if err != nil { + log.Debugf("File %s does not exist. Skipping deletion.", path) + return nil + } + + return os.Remove(path) +} + +func findDownloadedVideo(videoDir, videoID string) (*string, error) { + files, err := ioutil.ReadDir(videoDir) + if err != nil { + return nil, err + } + + for _, f := range files { + if f.IsDir() { + continue + } + if path.Ext(f.Name()) == ".mp4" && strings.Contains(f.Name(), videoID) { + videoPath := path.Join(videoDir, f.Name()) + return &videoPath, nil + } + } + return nil, nil +} + +func downloadVideoMetadata(basePath, videoID string) error { + ytdlArgs := []string{ + "--skip-download", + "--write-info-json", + "--force-overwrites", + fmt.Sprintf("https://www.youtube.com/watch?v=%s", videoID), + "--cookies", + "cookies.txt", + "-o", + basePath, + } + ytdlCmd := exec.Command("yt-dlp", ytdlArgs...) + output, err := runCmd(ytdlCmd) + log.Debug(output) + return err +} + +func downloadVideo(basePath, metadataPath string) error { + ytdlArgs := []string{ + "--no-progress", + "-o", + basePath, + "--merge-output-format", + "mp4", + "--postprocessor-args", + "ffmpeg:-movflags faststart", + "--abort-on-unavailable-fragment", + "--fragment-retries", + "1", + "--cookies", + "cookies.txt", + "--extractor-args", + "youtube:player_client=android", + "--load-info-json", + metadataPath, + "-fbestvideo[ext=mp4][vcodec!*=av01][height<=720]+bestaudio[ext!=webm][format_id!=258][format_id!=251][format_id!=256][format_id!=327]", + } + + ytdlCmd := exec.Command("yt-dlp", ytdlArgs...) + output, err := runCmd(ytdlCmd) + log.Debug(output) + return err +} + +func runCmd(cmd *exec.Cmd) ([]string, error) { + log.Infof("running cmd: %s", strings.Join(cmd.Args, " ")) + var err error + stderr, err := cmd.StderrPipe() + if err != nil { + return nil, err + } + stdout, err := cmd.StdoutPipe() + if err != nil { + return nil, err + } + err = cmd.Start() + if err != nil { + return nil, err + } + outLog, err := ioutil.ReadAll(stdout) + if err != nil { + return nil, err + } + errorLog, err := ioutil.ReadAll(stderr) + if err != nil { + return nil, err + } + done := make(chan error, 1) + go func() { + done <- cmd.Wait() + }() + + select { + case err := <-done: + if err != nil { + log.Error(string(errorLog)) + return nil, err + } + return strings.Split(strings.Replace(string(outLog), "\r\n", "\n", -1), "\n"), nil + } +} diff --git a/local/ytdlVideoSource.go b/local/ytdlVideoSource.go new file mode 100644 index 0000000..b68c18f --- /dev/null +++ b/local/ytdlVideoSource.go @@ -0,0 +1,76 @@ +package local + +import ( + log "github.com/sirupsen/logrus" + + "github.com/lbryio/ytsync/v5/downloader/ytdl" +) + +type YtdlVideoSource struct { + downloader Ytdl + enrichers []YouTubeVideoEnricher +} + +func NewYtdlVideoSource(downloadDir string, config *YouTubeSourceConfig) (*YtdlVideoSource, error) { + ytdl, err := NewYtdl(downloadDir) + if err != nil { + return nil, err + } + + source := YtdlVideoSource { + downloader: *ytdl, + } + + if config.YouTubeAPIKey != "" { + ytapiEnricher := NewYouTubeAPIVideoEnricher(config.YouTubeAPIKey) + source.enrichers = append(source.enrichers, ytapiEnricher) + } + + return &source, nil +} + +func (s *YtdlVideoSource) GetVideo(id string) (*SourceVideo, error) { + metadata, err := s.downloader.GetVideoMetadata(id) + if err != nil { + return nil, err + } + + videoPath, err := s.downloader.GetVideoFile(id) + if err != nil { + return nil, err + } + + var bestThumbnail *ytdl.Thumbnail = nil + for i, thumbnail := range metadata.Thumbnails { + if i == 0 || bestThumbnail.Width < thumbnail.Width { + bestThumbnail = &thumbnail + } + } + + sourceVideo := SourceVideo { + ID: id, + Title: &metadata.Title, + Description: &metadata.Description, + SourceURL: "\nhttps://www.youtube.com/watch?v=" + id, + Languages: []string{}, + Tags: metadata.Tags, + ReleaseTime: nil, + ThumbnailURL: &bestThumbnail.URL, + FullLocalPath: videoPath, + } + + for _, enricher := range s.enrichers { + err = enricher.EnrichMissing(&sourceVideo) + if err != nil { + log.Warnf("Error enriching video %s, continuing enrichment: %v", id, err) + } + } + + log.Debugf("Source video retrieved via ytdl: %v", sourceVideo) + + return &sourceVideo, nil +} + +func (s *YtdlVideoSource) DeleteLocalCache(id string) error { + return s.downloader.DeleteVideoFiles(id) +} diff --git a/main.go b/main.go index 9e93c59..1745380 100644 --- a/main.go +++ b/main.go @@ -1,172 +1,24 @@ package main import ( - "fmt" "math/rand" "net/http" - "os" "time" - "github.com/lbryio/lbry.go/v2/extras/errors" - "github.com/lbryio/lbry.go/v2/extras/util" - "github.com/lbryio/ytsync/v5/manager" - "github.com/lbryio/ytsync/v5/sdk" - "github.com/lbryio/ytsync/v5/shared" - ytUtils "github.com/lbryio/ytsync/v5/util" + "github.com/lbryio/ytsync/v5/cmd" + "github.com/prometheus/client_golang/prometheus/promhttp" - log "github.com/sirupsen/logrus" - "github.com/spf13/cobra" -) - -var Version string - -const defaultMaxTries = 3 - -var ( - cliFlags shared.SyncFlags - maxVideoLength int ) func main() { rand.Seed(time.Now().UnixNano()) log.SetLevel(log.DebugLevel) - http.Handle("/metrics", promhttp.Handler()) + go func() { + http.Handle("/metrics", promhttp.Handler()) log.Error(http.ListenAndServe(":2112", nil)) }() - cmd := &cobra.Command{ - Use: "ytsync", - Short: "Publish youtube channels into LBRY network automatically.", - Run: ytSync, - Args: cobra.RangeArgs(0, 0), - } - cmd.Flags().IntVar(&cliFlags.MaxTries, "max-tries", defaultMaxTries, "Number of times to try a publish that fails") - cmd.Flags().BoolVar(&cliFlags.TakeOverExistingChannel, "takeover-existing-channel", false, "If channel exists and we don't own it, take over the channel") - cmd.Flags().IntVar(&cliFlags.Limit, "limit", 0, "limit the amount of channels to sync") - cmd.Flags().BoolVar(&cliFlags.SkipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup") - cmd.Flags().BoolVar(&cliFlags.SyncUpdate, "update", false, "Update previously synced channels instead of syncing new ones") - cmd.Flags().BoolVar(&cliFlags.SingleRun, "run-once", false, "Whether the process should be stopped after one cycle or not") - cmd.Flags().BoolVar(&cliFlags.RemoveDBUnpublished, "remove-db-unpublished", false, "Remove videos from the database that are marked as published but aren't really published") - cmd.Flags().BoolVar(&cliFlags.UpgradeMetadata, "upgrade-metadata", false, "Upgrade videos if they're on the old metadata version") - cmd.Flags().BoolVar(&cliFlags.DisableTransfers, "no-transfers", false, "Skips the transferring process of videos, channels and supports") - cmd.Flags().BoolVar(&cliFlags.QuickSync, "quick", false, "Look up only the last 50 videos from youtube") - cmd.Flags().StringVar(&cliFlags.Status, "status", "", "Specify which queue to pull from. Overrides --update") - cmd.Flags().StringVar(&cliFlags.SecondaryStatus, "status2", "", "Specify which secondary queue to pull from.") - cmd.Flags().StringVar(&cliFlags.ChannelID, "channelID", "", "If specified, only this channel will be synced.") - cmd.Flags().Int64Var(&cliFlags.SyncFrom, "after", time.Unix(0, 0).Unix(), "Specify from when to pull jobs [Unix time](Default: 0)") - cmd.Flags().Int64Var(&cliFlags.SyncUntil, "before", time.Now().AddDate(1, 0, 0).Unix(), "Specify until when to pull jobs [Unix time](Default: current Unix time)") - cmd.Flags().IntVar(&cliFlags.ConcurrentJobs, "concurrent-jobs", 1, "how many jobs to process concurrently") - cmd.Flags().IntVar(&cliFlags.VideosLimit, "videos-limit", 0, "how many videos to process per channel (leave 0 for automatic detection)") - cmd.Flags().IntVar(&cliFlags.MaxVideoSize, "max-size", 2048, "Maximum video size to process (in MB)") - cmd.Flags().IntVar(&maxVideoLength, "max-length", 2, "Maximum video length to process (in hours)") - - if err := cmd.Execute(); err != nil { - fmt.Println(err) - os.Exit(1) - } -} - -func ytSync(cmd *cobra.Command, args []string) { - var hostname string - slackToken := os.Getenv("SLACK_TOKEN") - if slackToken == "" { - log.Error("A slack token was not present in env vars! Slack messages disabled!") - } else { - var err error - hostname, err = os.Hostname() - if err != nil { - log.Error("could not detect system hostname") - hostname = "ytsync-unknown" - } - if len(hostname) > 30 { - hostname = hostname[0:30] - } - - util.InitSlack(os.Getenv("SLACK_TOKEN"), os.Getenv("SLACK_CHANNEL"), hostname) - } - - if cliFlags.Status != "" && !util.InSlice(cliFlags.Status, shared.SyncStatuses) { - log.Errorf("status must be one of the following: %v\n", shared.SyncStatuses) - return - } - - if cliFlags.MaxTries < 1 { - log.Errorln("setting --max-tries less than 1 doesn't make sense") - return - } - - if cliFlags.Limit < 0 { - log.Errorln("setting --limit less than 0 (unlimited) doesn't make sense") - return - } - cliFlags.MaxVideoLength = time.Duration(maxVideoLength) * time.Hour - - apiURL := os.Getenv("LBRY_WEB_API") - apiToken := os.Getenv("LBRY_API_TOKEN") - youtubeAPIKey := os.Getenv("YOUTUBE_API_KEY") - lbrycrdDsn := os.Getenv("LBRYCRD_STRING") - awsS3ID := os.Getenv("AWS_S3_ID") - awsS3Secret := os.Getenv("AWS_S3_SECRET") - awsS3Region := os.Getenv("AWS_S3_REGION") - awsS3Bucket := os.Getenv("AWS_S3_BUCKET") - if apiURL == "" { - log.Errorln("An API URL was not defined. Please set the environment variable LBRY_WEB_API") - return - } - if apiToken == "" { - log.Errorln("An API Token was not defined. Please set the environment variable LBRY_API_TOKEN") - return - } - if youtubeAPIKey == "" { - log.Errorln("A Youtube API key was not defined. Please set the environment variable YOUTUBE_API_KEY") - return - } - if awsS3ID == "" { - log.Errorln("AWS S3 ID credentials were not defined. Please set the environment variable AWS_S3_ID") - return - } - if awsS3Secret == "" { - log.Errorln("AWS S3 Secret credentials were not defined. Please set the environment variable AWS_S3_SECRET") - return - } - if awsS3Region == "" { - log.Errorln("AWS S3 Region was not defined. Please set the environment variable AWS_S3_REGION") - return - } - if awsS3Bucket == "" { - log.Errorln("AWS S3 Bucket was not defined. Please set the environment variable AWS_S3_BUCKET") - return - } - if lbrycrdDsn == "" { - log.Infoln("Using default (local) lbrycrd instance. Set LBRYCRD_STRING if you want to use something else") - } - - blobsDir := ytUtils.GetBlobsDir() - - apiConfig := &sdk.APIConfig{ - YoutubeAPIKey: youtubeAPIKey, - ApiURL: apiURL, - ApiToken: apiToken, - HostName: hostname, - } - awsConfig := &shared.AwsConfigs{ - AwsS3ID: awsS3ID, - AwsS3Secret: awsS3Secret, - AwsS3Region: awsS3Region, - AwsS3Bucket: awsS3Bucket, - } - sm := manager.NewSyncManager( - cliFlags, - blobsDir, - lbrycrdDsn, - awsConfig, - apiConfig, - ) - err := sm.Start() - if err != nil { - ytUtils.SendErrorToSlack(errors.FullTrace(err)) - } - ytUtils.SendInfoToSlack("Syncing process terminated!") + cmd.Execute() } diff --git a/manager/manager.go b/manager/manager.go index e01ffc6..2e50f84 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -58,9 +58,12 @@ func (s *SyncManager) Start() error { } } - var lastChannelProcessed string - var secondLastChannelProcessed string - syncCount := 0 + var ( + lastChannelProcessed string + secondLastChannelProcessed string + syncCount int + ) + for { s.channelsToSync = make([]Sync, 0, 10) // reset sync queue err := s.checkUsedSpace() @@ -108,10 +111,12 @@ func (s *SyncManager) Start() error { log.Infof("Drained the \"%s\" queue", q) } } + if len(s.channelsToSync) == 0 { log.Infoln("No channels to sync. Pausing 5 minutes!") time.Sleep(5 * time.Minute) } + for _, sync := range s.channelsToSync { if lastChannelProcessed == sync.DbChannelData.ChannelId && secondLastChannelProcessed == lastChannelProcessed { util.SendToSlack("We just killed a sync for %s to stop looping! (%s)", sync.DbChannelData.DesiredChannelName, sync.DbChannelData.ChannelId) @@ -174,6 +179,7 @@ func (s *SyncManager) Start() error { break } } + return nil }