Compare commits
8 commits
master
...
pseudoscal
Author | SHA1 | Date | |
---|---|---|---|
|
ce901f6b01 | ||
|
9f6b15e841 | ||
|
e564dc8445 | ||
|
eb30fa4299 | ||
|
2ba960ae01 | ||
|
8ea15afce8 | ||
|
b9bf2f6e73 | ||
|
e554bbfe18 |
12 changed files with 1077 additions and 267 deletions
|
@ -1,111 +0,0 @@
|
||||||
From 30380338ba9af01696c94b61f0597131638eaec1 Mon Sep 17 00:00:00 2001
|
|
||||||
From: Niko Storni <niko@lbry.io>
|
|
||||||
Date: Mon, 16 Dec 2019 00:13:36 +0100
|
|
||||||
Subject: [PATCH] lbry-patch
|
|
||||||
|
|
||||||
---
|
|
||||||
youtube_dl/extractor/youtube.py | 45 +++++++++++++++++++++++++--------
|
|
||||||
1 file changed, 35 insertions(+), 10 deletions(-)
|
|
||||||
|
|
||||||
diff --git a/youtube_dl/extractor/youtube.py b/youtube_dl/extractor/youtube.py
|
|
||||||
index b913d07a6..cd66a5b01 100644
|
|
||||||
--- a/youtube_dl/extractor/youtube.py
|
|
||||||
+++ b/youtube_dl/extractor/youtube.py
|
|
||||||
@@ -10,6 +10,7 @@ import random
|
|
||||||
import re
|
|
||||||
import time
|
|
||||||
import traceback
|
|
||||||
+import subprocess
|
|
||||||
|
|
||||||
from .common import InfoExtractor, SearchInfoExtractor
|
|
||||||
from ..jsinterp import JSInterpreter
|
|
||||||
@@ -536,6 +537,9 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
|
|
||||||
|
|
||||||
_GEO_BYPASS = False
|
|
||||||
|
|
||||||
+ _WGET_429_RATE_LIMIT = 8191
|
|
||||||
+ _WGET_BINARY = "wget"
|
|
||||||
+
|
|
||||||
IE_NAME = 'youtube'
|
|
||||||
_TESTS = [
|
|
||||||
{
|
|
||||||
@@ -1254,6 +1258,17 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
|
|
||||||
""" Return a string representation of a signature """
|
|
||||||
return '.'.join(compat_str(len(part)) for part in example_sig.split('.'))
|
|
||||||
|
|
||||||
+ def _rate_limit_download(self, url, video_id, note=None):
|
|
||||||
+ if note is None:
|
|
||||||
+ self.report_download_webpage(video_id)
|
|
||||||
+ elif note is not False:
|
|
||||||
+ if video_id is None:
|
|
||||||
+ self.to_screen('%s' % (note,))
|
|
||||||
+ else:
|
|
||||||
+ self.to_screen('%s: %s' % (video_id, note))
|
|
||||||
+ source_address = self._downloader.params.get('source_address')
|
|
||||||
+ return subprocess.run([self._WGET_BINARY, '-q', '--limit-rate', str(self._WGET_429_RATE_LIMIT), '--bind-address', source_address, '-O', '-', url], check=True, stdout=subprocess.PIPE).stdout.decode(encoding='UTF-8')
|
|
||||||
+
|
|
||||||
def _extract_signature_function(self, video_id, player_url, example_sig):
|
|
||||||
id_m = re.match(
|
|
||||||
r'.*?-(?P<id>[a-zA-Z0-9_-]+)(?:/watch_as3|/html5player(?:-new)?|(?:/[a-z]{2,3}_[A-Z]{2})?/base)?\.(?P<ext>[a-z]+)$',
|
|
||||||
@@ -1678,7 +1693,7 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
|
|
||||||
|
|
||||||
# Get video webpage
|
|
||||||
url = proto + '://www.youtube.com/watch?v=%s&gl=US&hl=en&has_verified=1&bpctr=9999999999' % video_id
|
|
||||||
- video_webpage = self._download_webpage(url, video_id)
|
|
||||||
+ video_webpage = self._rate_limit_download(url, video_id)
|
|
||||||
|
|
||||||
# Attempt to extract SWF player URL
|
|
||||||
mobj = re.search(r'swfConfig.*?"(https?:\\/\\/.*?watch.*?-.*?\.swf)"', video_webpage)
|
|
||||||
@@ -1736,10 +1751,9 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
|
|
||||||
r'"sts"\s*:\s*(\d+)', embed_webpage, 'sts', default=''),
|
|
||||||
})
|
|
||||||
video_info_url = proto + '://www.youtube.com/get_video_info?' + data
|
|
||||||
- video_info_webpage = self._download_webpage(
|
|
||||||
+ video_info_webpage = self._rate_limit_download(
|
|
||||||
video_info_url, video_id,
|
|
||||||
- note='Refetching age-gated info webpage',
|
|
||||||
- errnote='unable to download video info webpage')
|
|
||||||
+ note='Refetching age-gated info webpage')
|
|
||||||
video_info = compat_parse_qs(video_info_webpage)
|
|
||||||
pl_response = video_info.get('player_response', [None])[0]
|
|
||||||
player_response = extract_player_response(pl_response, video_id)
|
|
||||||
@@ -1777,7 +1791,7 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
|
|
||||||
# The general idea is to take a union of itags of both DASH manifests (for example
|
|
||||||
# video with such 'manifest behavior' see https://github.com/ytdl-org/youtube-dl/issues/6093)
|
|
||||||
self.report_video_info_webpage_download(video_id)
|
|
||||||
- for el in ('embedded', 'detailpage', 'vevo', ''):
|
|
||||||
+ for el in ('', 'embedded', 'detailpage', 'vevo'):
|
|
||||||
query = {
|
|
||||||
'video_id': video_id,
|
|
||||||
'ps': 'default',
|
|
||||||
@@ -1789,11 +1803,22 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
|
|
||||||
query['el'] = el
|
|
||||||
if sts:
|
|
||||||
query['sts'] = sts
|
|
||||||
- video_info_webpage = self._download_webpage(
|
|
||||||
- '%s://www.youtube.com/get_video_info' % proto,
|
|
||||||
- video_id, note=False,
|
|
||||||
- errnote='unable to download video info webpage',
|
|
||||||
- fatal=False, query=query)
|
|
||||||
+
|
|
||||||
+ if el == '':
|
|
||||||
+ base_url = 'https://youtube.com/get_video_info?video_id={}'.format(video_id)
|
|
||||||
+ else:
|
|
||||||
+ base_url = 'https://youtube.com/get_video_info'
|
|
||||||
+
|
|
||||||
+ for q in query:
|
|
||||||
+ if q is None or q is "":
|
|
||||||
+ continue
|
|
||||||
+ if query[q] is None or query[q] is "":
|
|
||||||
+ continue
|
|
||||||
+
|
|
||||||
+ base_url = base_url + "?{}={}".format(q, query[q])
|
|
||||||
+
|
|
||||||
+ video_info_webpage = self._rate_limit_download(base_url, video_id)
|
|
||||||
+
|
|
||||||
if not video_info_webpage:
|
|
||||||
continue
|
|
||||||
get_video_info = compat_parse_qs(video_info_webpage)
|
|
||||||
--
|
|
||||||
2.17.1
|
|
||||||
|
|
7
cmd/local.go
Normal file
7
cmd/local.go
Normal file
|
@ -0,0 +1,7 @@
|
||||||
|
package cmd
|
||||||
|
|
||||||
|
import "github.com/lbryio/ytsync/v5/local"
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
local.AddCommand(rootCmd)
|
||||||
|
}
|
166
cmd/root.go
Normal file
166
cmd/root.go
Normal file
|
@ -0,0 +1,166 @@
|
||||||
|
package cmd
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/lbryio/ytsync/v5/manager"
|
||||||
|
"github.com/lbryio/ytsync/v5/sdk"
|
||||||
|
"github.com/lbryio/ytsync/v5/shared"
|
||||||
|
ytUtils "github.com/lbryio/ytsync/v5/util"
|
||||||
|
|
||||||
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||||
|
"github.com/lbryio/lbry.go/v2/extras/util"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
)
|
||||||
|
|
||||||
|
const defaultMaxTries = 3
|
||||||
|
|
||||||
|
var (
|
||||||
|
cliFlags shared.SyncFlags
|
||||||
|
maxVideoLength int
|
||||||
|
)
|
||||||
|
|
||||||
|
var rootCmd = &cobra.Command{
|
||||||
|
Use: "ytsync",
|
||||||
|
Short: "Publish youtube channels into LBRY network automatically.",
|
||||||
|
Run: ytSync,
|
||||||
|
Args: cobra.RangeArgs(0, 0),
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
rootCmd.Flags().IntVar(&cliFlags.MaxTries, "max-tries", defaultMaxTries, "Number of times to try a publish that fails")
|
||||||
|
rootCmd.Flags().BoolVar(&cliFlags.TakeOverExistingChannel, "takeover-existing-channel", false, "If channel exists and we don't own it, take over the channel")
|
||||||
|
rootCmd.Flags().IntVar(&cliFlags.Limit, "limit", 0, "limit the amount of channels to sync")
|
||||||
|
rootCmd.Flags().BoolVar(&cliFlags.SkipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup")
|
||||||
|
rootCmd.Flags().BoolVar(&cliFlags.SyncUpdate, "update", false, "Update previously synced channels instead of syncing new ones")
|
||||||
|
rootCmd.Flags().BoolVar(&cliFlags.SingleRun, "run-once", false, "Whether the process should be stopped after one cycle or not")
|
||||||
|
rootCmd.Flags().BoolVar(&cliFlags.RemoveDBUnpublished, "remove-db-unpublished", false, "Remove videos from the database that are marked as published but aren't really published")
|
||||||
|
rootCmd.Flags().BoolVar(&cliFlags.UpgradeMetadata, "upgrade-metadata", false, "Upgrade videos if they're on the old metadata version")
|
||||||
|
rootCmd.Flags().BoolVar(&cliFlags.DisableTransfers, "no-transfers", false, "Skips the transferring process of videos, channels and supports")
|
||||||
|
rootCmd.Flags().BoolVar(&cliFlags.QuickSync, "quick", false, "Look up only the last 50 videos from youtube")
|
||||||
|
rootCmd.Flags().StringVar(&cliFlags.Status, "status", "", "Specify which queue to pull from. Overrides --update")
|
||||||
|
rootCmd.Flags().StringVar(&cliFlags.SecondaryStatus, "status2", "", "Specify which secondary queue to pull from.")
|
||||||
|
rootCmd.Flags().StringVar(&cliFlags.ChannelID, "channelID", "", "If specified, only this channel will be synced.")
|
||||||
|
rootCmd.Flags().Int64Var(&cliFlags.SyncFrom, "after", time.Unix(0, 0).Unix(), "Specify from when to pull jobs [Unix time](Default: 0)")
|
||||||
|
rootCmd.Flags().Int64Var(&cliFlags.SyncUntil, "before", time.Now().AddDate(1, 0, 0).Unix(), "Specify until when to pull jobs [Unix time](Default: current Unix time)")
|
||||||
|
rootCmd.Flags().IntVar(&cliFlags.ConcurrentJobs, "concurrent-jobs", 1, "how many jobs to process concurrently")
|
||||||
|
rootCmd.Flags().IntVar(&cliFlags.VideosLimit, "videos-limit", 0, "how many videos to process per channel (leave 0 for automatic detection)")
|
||||||
|
rootCmd.Flags().IntVar(&cliFlags.MaxVideoSize, "max-size", 2048, "Maximum video size to process (in MB)")
|
||||||
|
rootCmd.Flags().IntVar(&maxVideoLength, "max-length", 2, "Maximum video length to process (in hours)")
|
||||||
|
}
|
||||||
|
|
||||||
|
func Execute() {
|
||||||
|
err := rootCmd.Execute()
|
||||||
|
if err != nil {
|
||||||
|
log.Errorln(err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func ytSync(cmd *cobra.Command, args []string) {
|
||||||
|
var hostname string
|
||||||
|
slackToken := os.Getenv("SLACK_TOKEN")
|
||||||
|
if slackToken == "" {
|
||||||
|
log.Error("A slack token was not present in env vars! Slack messages disabled!")
|
||||||
|
} else {
|
||||||
|
var err error
|
||||||
|
hostname, err = os.Hostname()
|
||||||
|
if err != nil {
|
||||||
|
log.Error("could not detect system hostname")
|
||||||
|
hostname = "ytsync-unknown"
|
||||||
|
}
|
||||||
|
if len(hostname) > 30 {
|
||||||
|
hostname = hostname[0:30]
|
||||||
|
}
|
||||||
|
|
||||||
|
util.InitSlack(os.Getenv("SLACK_TOKEN"), os.Getenv("SLACK_CHANNEL"), hostname)
|
||||||
|
}
|
||||||
|
|
||||||
|
if cliFlags.Status != "" && !util.InSlice(cliFlags.Status, shared.SyncStatuses) {
|
||||||
|
log.Errorf("status must be one of the following: %v\n", shared.SyncStatuses)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if cliFlags.MaxTries < 1 {
|
||||||
|
log.Errorln("setting --max-tries less than 1 doesn't make sense")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if cliFlags.Limit < 0 {
|
||||||
|
log.Errorln("setting --limit less than 0 (unlimited) doesn't make sense")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
cliFlags.MaxVideoLength = time.Duration(maxVideoLength) * time.Hour
|
||||||
|
|
||||||
|
apiURL := os.Getenv("LBRY_WEB_API")
|
||||||
|
apiToken := os.Getenv("LBRY_API_TOKEN")
|
||||||
|
youtubeAPIKey := os.Getenv("YOUTUBE_API_KEY")
|
||||||
|
lbrycrdDsn := os.Getenv("LBRYCRD_STRING")
|
||||||
|
awsS3ID := os.Getenv("AWS_S3_ID")
|
||||||
|
awsS3Secret := os.Getenv("AWS_S3_SECRET")
|
||||||
|
awsS3Region := os.Getenv("AWS_S3_REGION")
|
||||||
|
awsS3Bucket := os.Getenv("AWS_S3_BUCKET")
|
||||||
|
if apiURL == "" {
|
||||||
|
log.Errorln("An API URL was not defined. Please set the environment variable LBRY_WEB_API")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if apiToken == "" {
|
||||||
|
log.Errorln("An API Token was not defined. Please set the environment variable LBRY_API_TOKEN")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if youtubeAPIKey == "" {
|
||||||
|
log.Errorln("A Youtube API key was not defined. Please set the environment variable YOUTUBE_API_KEY")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if awsS3ID == "" {
|
||||||
|
log.Errorln("AWS S3 ID credentials were not defined. Please set the environment variable AWS_S3_ID")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if awsS3Secret == "" {
|
||||||
|
log.Errorln("AWS S3 Secret credentials were not defined. Please set the environment variable AWS_S3_SECRET")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if awsS3Region == "" {
|
||||||
|
log.Errorln("AWS S3 Region was not defined. Please set the environment variable AWS_S3_REGION")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if awsS3Bucket == "" {
|
||||||
|
log.Errorln("AWS S3 Bucket was not defined. Please set the environment variable AWS_S3_BUCKET")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if lbrycrdDsn == "" {
|
||||||
|
log.Infoln("Using default (local) lbrycrd instance. Set LBRYCRD_STRING if you want to use something else")
|
||||||
|
}
|
||||||
|
|
||||||
|
blobsDir := ytUtils.GetBlobsDir()
|
||||||
|
|
||||||
|
apiConfig := &sdk.APIConfig{
|
||||||
|
YoutubeAPIKey: youtubeAPIKey,
|
||||||
|
ApiURL: apiURL,
|
||||||
|
ApiToken: apiToken,
|
||||||
|
HostName: hostname,
|
||||||
|
}
|
||||||
|
awsConfig := &shared.AwsConfigs{
|
||||||
|
AwsS3ID: awsS3ID,
|
||||||
|
AwsS3Secret: awsS3Secret,
|
||||||
|
AwsS3Region: awsS3Region,
|
||||||
|
AwsS3Bucket: awsS3Bucket,
|
||||||
|
}
|
||||||
|
sm := manager.NewSyncManager(
|
||||||
|
cliFlags,
|
||||||
|
blobsDir,
|
||||||
|
lbrycrdDsn,
|
||||||
|
awsConfig,
|
||||||
|
apiConfig,
|
||||||
|
)
|
||||||
|
|
||||||
|
err := sm.Start()
|
||||||
|
if err != nil {
|
||||||
|
ytUtils.SendErrorToSlack(errors.FullTrace(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
ytUtils.SendInfoToSlack("Syncing process terminated!")
|
||||||
|
}
|
253
local/local.go
Normal file
253
local/local.go
Normal file
|
@ -0,0 +1,253 @@
|
||||||
|
package local
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"os"
|
||||||
|
"regexp"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
"github.com/abadojack/whatlanggo"
|
||||||
|
|
||||||
|
"github.com/lbryio/lbry.go/v2/extras/util"
|
||||||
|
"github.com/lbryio/ytsync/v5/namer"
|
||||||
|
"github.com/lbryio/ytsync/v5/tags_manager"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SyncContext struct {
|
||||||
|
DryRun bool
|
||||||
|
KeepCache bool
|
||||||
|
ReflectStreams bool
|
||||||
|
TempDir string
|
||||||
|
LbrynetAddr string
|
||||||
|
ChannelID string
|
||||||
|
PublishBid float64
|
||||||
|
YouTubeSourceConfig *YouTubeSourceConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SyncContext) Validate() error {
|
||||||
|
if c.TempDir == "" {
|
||||||
|
return errors.New("No TempDir provided")
|
||||||
|
}
|
||||||
|
if c.LbrynetAddr == "" {
|
||||||
|
return errors.New("No Lbrynet address provided")
|
||||||
|
}
|
||||||
|
if c.ChannelID == "" {
|
||||||
|
return errors.New("No channel ID provided")
|
||||||
|
}
|
||||||
|
if c.PublishBid <= 0.0 {
|
||||||
|
return errors.New("Publish bid is not greater than zero")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type YouTubeSourceConfig struct {
|
||||||
|
YouTubeAPIKey string
|
||||||
|
}
|
||||||
|
|
||||||
|
var syncContext SyncContext
|
||||||
|
|
||||||
|
func AddCommand(rootCmd *cobra.Command) {
|
||||||
|
cmd := &cobra.Command{
|
||||||
|
Use: "local",
|
||||||
|
Short: "run a personal ytsync",
|
||||||
|
Run: localCmd,
|
||||||
|
Args: cobra.ExactArgs(1),
|
||||||
|
}
|
||||||
|
cmd.Flags().BoolVar(&syncContext.DryRun, "dry-run", false, "Display information about the stream publishing, but do not publish the stream")
|
||||||
|
cmd.Flags().BoolVar(&syncContext.KeepCache, "keep-cache", false, "Don't delete local files after publishing.")
|
||||||
|
cmd.Flags().BoolVar(&syncContext.ReflectStreams, "reflect-streams", true, "Require published streams to be reflected.")
|
||||||
|
cmd.Flags().StringVar(&syncContext.TempDir, "temp-dir", getEnvDefault("TEMP_DIR", ""), "directory to use for temporary files")
|
||||||
|
cmd.Flags().Float64Var(&syncContext.PublishBid, "publish-bid", 0.01, "Bid amount for the stream claim")
|
||||||
|
cmd.Flags().StringVar(&syncContext.LbrynetAddr, "lbrynet-address", getEnvDefault("LBRYNET_ADDRESS", ""), "JSONRPC address of the local LBRYNet daemon")
|
||||||
|
cmd.Flags().StringVar(&syncContext.ChannelID, "channel-id", "", "LBRY channel ID to publish to")
|
||||||
|
|
||||||
|
// For now, assume source is always YouTube
|
||||||
|
syncContext.YouTubeSourceConfig = &YouTubeSourceConfig{}
|
||||||
|
cmd.Flags().StringVar(&syncContext.YouTubeSourceConfig.YouTubeAPIKey, "youtube-api-key", getEnvDefault("YOUTUBE_API_KEY", ""), "YouTube API Key")
|
||||||
|
rootCmd.AddCommand(cmd)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getEnvDefault(key, defaultValue string) string {
|
||||||
|
if value, ok := os.LookupEnv(key); ok {
|
||||||
|
return value
|
||||||
|
}
|
||||||
|
return defaultValue
|
||||||
|
}
|
||||||
|
|
||||||
|
func localCmd(cmd *cobra.Command, args []string) {
|
||||||
|
err := syncContext.Validate()
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
videoID := args[0]
|
||||||
|
|
||||||
|
log.Debugf("Running sync for video ID %s", videoID)
|
||||||
|
|
||||||
|
var publisher VideoPublisher
|
||||||
|
publisher, err = NewLocalSDKPublisher(syncContext.LbrynetAddr, syncContext.ChannelID, syncContext.PublishBid)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error setting up publisher: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var videoSource VideoSource
|
||||||
|
if syncContext.YouTubeSourceConfig != nil {
|
||||||
|
videoSource, err = NewYtdlVideoSource(syncContext.TempDir, syncContext.YouTubeSourceConfig)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error setting up video source: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sourceVideo, err := videoSource.GetVideo(videoID)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error getting source video: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
processedVideo, err := processVideoForPublishing(*sourceVideo, syncContext.ChannelID)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error processing source video for publishing: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if syncContext.DryRun {
|
||||||
|
log.Infoln("This is a dry run. Nothing will be published.")
|
||||||
|
log.Infof("The local file %s would be published to channel ID %s as %s.", processedVideo.FullLocalPath, syncContext.ChannelID, processedVideo.ClaimName)
|
||||||
|
log.Debugf("Object to be published: %v", processedVideo)
|
||||||
|
|
||||||
|
} else {
|
||||||
|
doneReflectingCh, err := publisher.Publish(*processedVideo, syncContext.ReflectStreams)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error publishing video: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if syncContext.ReflectStreams {
|
||||||
|
err = <-doneReflectingCh
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error while wating for stream to reflect: %v", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.Debugln("Not waiting for stream to reflect.")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !syncContext.KeepCache {
|
||||||
|
log.Infof("Deleting local files.")
|
||||||
|
err = videoSource.DeleteLocalCache(videoID)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error deleting local files for video %s: %v", videoID, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.Info("Done")
|
||||||
|
}
|
||||||
|
|
||||||
|
type SourceVideo struct {
|
||||||
|
ID string
|
||||||
|
Title *string
|
||||||
|
Description *string
|
||||||
|
SourceURL string
|
||||||
|
Languages []string
|
||||||
|
Tags []string
|
||||||
|
ReleaseTime *int64
|
||||||
|
ThumbnailURL *string
|
||||||
|
FullLocalPath string
|
||||||
|
}
|
||||||
|
|
||||||
|
type PublishableVideo struct {
|
||||||
|
ID string
|
||||||
|
ClaimName string
|
||||||
|
Title string
|
||||||
|
Description string
|
||||||
|
SourceURL string
|
||||||
|
Languages []string
|
||||||
|
Tags []string
|
||||||
|
ReleaseTime int64
|
||||||
|
ThumbnailURL string
|
||||||
|
FullLocalPath string
|
||||||
|
}
|
||||||
|
|
||||||
|
func processVideoForPublishing(source SourceVideo, channelID string) (*PublishableVideo, error) {
|
||||||
|
tags, err := tags_manager.SanitizeTags(source.Tags, channelID)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error sanitizing tags: %v", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
descriptionSample := ""
|
||||||
|
if source.Description != nil {
|
||||||
|
urlsRegex := regexp.MustCompile(`(?m) ?(f|ht)(tp)(s?)(://)(.*)[.|/](.*)`)
|
||||||
|
descriptionSample = urlsRegex.ReplaceAllString(*source.Description, "")
|
||||||
|
}
|
||||||
|
info := whatlanggo.Detect(descriptionSample)
|
||||||
|
|
||||||
|
title := ""
|
||||||
|
if source.Title != nil {
|
||||||
|
title = *source.Title
|
||||||
|
}
|
||||||
|
info2 := whatlanggo.Detect(title)
|
||||||
|
var languages []string = nil
|
||||||
|
if info.IsReliable() && info.Lang.Iso6391() != "" {
|
||||||
|
language := info.Lang.Iso6391()
|
||||||
|
languages = []string{language}
|
||||||
|
} else if info2.IsReliable() && info2.Lang.Iso6391() != "" {
|
||||||
|
language := info2.Lang.Iso6391()
|
||||||
|
languages = []string{language}
|
||||||
|
}
|
||||||
|
|
||||||
|
claimName := namer.NewNamer().GetNextName(title)
|
||||||
|
|
||||||
|
thumbnailURL := source.ThumbnailURL
|
||||||
|
if thumbnailURL == nil {
|
||||||
|
thumbnailURL = util.PtrToString("")
|
||||||
|
}
|
||||||
|
|
||||||
|
releaseTime := source.ReleaseTime
|
||||||
|
if releaseTime == nil {
|
||||||
|
releaseTime = util.PtrToInt64(time.Now().Unix())
|
||||||
|
}
|
||||||
|
|
||||||
|
processed := PublishableVideo {
|
||||||
|
ClaimName: claimName,
|
||||||
|
Title: title,
|
||||||
|
Description: getAbbrevDescription(source),
|
||||||
|
Languages: languages,
|
||||||
|
Tags: tags,
|
||||||
|
ReleaseTime: *releaseTime,
|
||||||
|
ThumbnailURL: *thumbnailURL,
|
||||||
|
FullLocalPath: source.FullLocalPath,
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debugf("Video prepared for publication: %v", processed)
|
||||||
|
|
||||||
|
return &processed, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getAbbrevDescription(v SourceVideo) string {
|
||||||
|
if v.Description == nil {
|
||||||
|
return v.SourceURL
|
||||||
|
}
|
||||||
|
|
||||||
|
additionalDescription := "\n...\n" + v.SourceURL
|
||||||
|
maxLength := 2800 - len(additionalDescription)
|
||||||
|
|
||||||
|
description := strings.TrimSpace(*v.Description)
|
||||||
|
if len(description) > maxLength {
|
||||||
|
description = description[:maxLength]
|
||||||
|
}
|
||||||
|
return description + additionalDescription
|
||||||
|
}
|
||||||
|
|
||||||
|
type VideoSource interface {
|
||||||
|
GetVideo(id string) (*SourceVideo, error)
|
||||||
|
DeleteLocalCache(id string) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type VideoPublisher interface {
|
||||||
|
Publish(video PublishableVideo, reflectStream bool) (chan error, error)
|
||||||
|
}
|
125
local/localSDKPublisher.go
Normal file
125
local/localSDKPublisher.go
Normal file
|
@ -0,0 +1,125 @@
|
||||||
|
package local
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"sort"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/lbryio/lbry.go/v2/extras/jsonrpc"
|
||||||
|
"github.com/lbryio/lbry.go/v2/extras/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
type LocalSDKPublisher struct {
|
||||||
|
channelID string
|
||||||
|
publishBid float64
|
||||||
|
lbrynet *jsonrpc.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewLocalSDKPublisher(sdkAddr, channelID string, publishBid float64) (*LocalSDKPublisher, error) {
|
||||||
|
lbrynet := jsonrpc.NewClient(sdkAddr)
|
||||||
|
lbrynet.SetRPCTimeout(5 * time.Minute)
|
||||||
|
|
||||||
|
status, err := lbrynet.Status()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !status.IsRunning {
|
||||||
|
return nil, errors.New("SDK is not running")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should check to see if the SDK owns the channel
|
||||||
|
|
||||||
|
// Should check to see if wallet is unlocked
|
||||||
|
// but jsonrpc.Client doesn't have WalletStatus method
|
||||||
|
// so skip for now
|
||||||
|
|
||||||
|
// Should check to see if streams are configured to be reflected and warn if not
|
||||||
|
// but jsonrpc.Client doesn't have SettingsGet method to see if streams are reflected
|
||||||
|
// so use File.UploadingToReflector as a proxy for now
|
||||||
|
|
||||||
|
publisher := LocalSDKPublisher {
|
||||||
|
channelID: channelID,
|
||||||
|
publishBid: publishBid,
|
||||||
|
lbrynet: lbrynet,
|
||||||
|
}
|
||||||
|
return &publisher, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *LocalSDKPublisher) Publish(video PublishableVideo, reflectStream bool) (chan error, error) {
|
||||||
|
streamCreateOptions := jsonrpc.StreamCreateOptions {
|
||||||
|
ClaimCreateOptions: jsonrpc.ClaimCreateOptions {
|
||||||
|
Title: &video.Title,
|
||||||
|
Description: &video.Description,
|
||||||
|
Languages: video.Languages,
|
||||||
|
ThumbnailURL: &video.ThumbnailURL,
|
||||||
|
Tags: video.Tags,
|
||||||
|
},
|
||||||
|
ReleaseTime: &video.ReleaseTime,
|
||||||
|
ChannelID: &p.channelID,
|
||||||
|
License: util.PtrToString("Copyrighted (contact publisher)"),
|
||||||
|
}
|
||||||
|
|
||||||
|
txSummary, err := p.lbrynet.StreamCreate(video.ClaimName, video.FullLocalPath, p.publishBid, streamCreateOptions)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !reflectStream {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
done := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
fileListResponse, fileIndex, err := findFileByTxid(p.lbrynet, txSummary.Txid)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error finding file by txid: %v", err)
|
||||||
|
done <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if fileListResponse == nil {
|
||||||
|
log.Errorf("Could not find file in list with correct txid")
|
||||||
|
done <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
fileStatus := fileListResponse.Items[fileIndex]
|
||||||
|
if fileStatus.IsFullyReflected {
|
||||||
|
log.Info("Stream is fully reflected")
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if !fileStatus.UploadingToReflector {
|
||||||
|
log.Error("Stream is not being uploaded to a reflector. Check your lbrynet settings if this is a mistake.")
|
||||||
|
done <- errors.New("Stream is not being reflected (check lbrynet settings).")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.Infof("Stream reflector progress: %d%%", fileStatus.ReflectorProgress)
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
}
|
||||||
|
done <- nil
|
||||||
|
}()
|
||||||
|
|
||||||
|
return done, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// if jsonrpc.Client.FileList is extended to match the actual jsonrpc schema, this can be removed
|
||||||
|
func findFileByTxid(client *jsonrpc.Client, txid string) (*jsonrpc.FileListResponse, int, error) {
|
||||||
|
response, err := client.FileList(0, 20)
|
||||||
|
for {
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error getting file list page: %v", err)
|
||||||
|
return nil, 0, err
|
||||||
|
}
|
||||||
|
index := sort.Search(len(response.Items), func (i int) bool { return response.Items[i].Txid == txid })
|
||||||
|
if index < len(response.Items) {
|
||||||
|
return response, index, nil
|
||||||
|
}
|
||||||
|
if response.Page >= response.TotalPages {
|
||||||
|
return nil, 0, nil
|
||||||
|
}
|
||||||
|
response, err = client.FileList(response.Page + 1, 20)
|
||||||
|
}
|
||||||
|
}
|
69
local/readme.md
Normal file
69
local/readme.md
Normal file
|
@ -0,0 +1,69 @@
|
||||||
|
# Running ytsync locally
|
||||||
|
|
||||||
|
## Requirements
|
||||||
|
|
||||||
|
- LBRY SDK (what do we actually need this for?)
|
||||||
|
- youtube-dl
|
||||||
|
- enough space to cache stuff
|
||||||
|
- YouTube data API key
|
||||||
|
|
||||||
|
|
||||||
|
## Process
|
||||||
|
|
||||||
|
### Ensuring requirements are met
|
||||||
|
|
||||||
|
- claim channel if there isn't one yet
|
||||||
|
- or easier, just error if no channel
|
||||||
|
- enough lbc in wallet?
|
||||||
|
|
||||||
|
### Getting a YouTube API key
|
||||||
|
|
||||||
|
To access the YouTube data API, you will first need some kind of google account.
|
||||||
|
|
||||||
|
The API has two methods of authentication, OAuth2 and API keys. This application uses API keys.
|
||||||
|
These API keys are basically like passwords, and so once obtained, they should not be shared.
|
||||||
|
|
||||||
|
The instructions for obtaining an API key are copied below from [here](https://developers.google.com/youtube/registering_an_application):
|
||||||
|
|
||||||
|
|
||||||
|
1. Open the [Credentials page](https://console.developers.google.com/apis/credentials) in the API Console.
|
||||||
|
2. Create an API key in the Console by clicking **Create credentials > API key**. You can restrict the key before using it in production by clicking **Restrict key** and selecting one of the **Restrictions**.
|
||||||
|
|
||||||
|
To keep your API keys secure, follow the [best practices for securely using API keys](https://cloud.google.com/docs/authentication/api-keys).
|
||||||
|
|
||||||
|
### Options to figure out what's already synced
|
||||||
|
|
||||||
|
- simplest: assume nothing is synced yet
|
||||||
|
- assume everything before some video is synced
|
||||||
|
- get/put sync info from Odysee by proving you have private key for channel
|
||||||
|
- tag videos as having been synced from youtube so we can ensure accuracy
|
||||||
|
- hardest: scan channel and try to match up which videos are not synced yet
|
||||||
|
|
||||||
|
### Central DB
|
||||||
|
|
||||||
|
- prove you have a channel's private key to get info about that channel
|
||||||
|
- proper queue instead of sleeping for N minutes between syncs
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
### Syncing a single video
|
||||||
|
|
||||||
|
- downloading it
|
||||||
|
- thumbnails
|
||||||
|
- metadata
|
||||||
|
- having enough LBC for publish(es)
|
||||||
|
- automated error handling
|
||||||
|
- getting a human involved for errors that can't be handled automatically
|
||||||
|
- reflecting
|
||||||
|
|
||||||
|
### Continuous Sync
|
||||||
|
|
||||||
|
- running in background
|
||||||
|
- storing local state
|
||||||
|
- interactions with our central ytsync db
|
||||||
|
- dealing with yt throttling
|
||||||
|
|
||||||
|
|
||||||
|
### Debugging
|
||||||
|
|
||||||
|
- dry-running the whole thing
|
45
local/youtubeEnricher.go
Normal file
45
local/youtubeEnricher.go
Normal file
|
@ -0,0 +1,45 @@
|
||||||
|
package local
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/lbryio/lbry.go/v2/extras/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
type YouTubeVideoEnricher interface {
|
||||||
|
EnrichMissing(source *SourceVideo) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type YouTubeAPIVideoEnricher struct {
|
||||||
|
api *YouTubeAPI
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewYouTubeAPIVideoEnricher(apiKey string) (*YouTubeAPIVideoEnricher) {
|
||||||
|
enricher := YouTubeAPIVideoEnricher{
|
||||||
|
api: NewYouTubeAPI(apiKey),
|
||||||
|
}
|
||||||
|
return &enricher
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *YouTubeAPIVideoEnricher) EnrichMissing(source *SourceVideo) error {
|
||||||
|
if source.ReleaseTime != nil {
|
||||||
|
log.Debugf("Video %s does not need enrichment. YouTubeAPIVideoEnricher is skipping.", source.ID)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
snippet, err := e.api.GetVideoSnippet(source.ID)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error snippet data for video %s: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
publishedAt, err := time.Parse(time.RFC3339, snippet.PublishedAt)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error converting publishedAt to timestamp: %v", err)
|
||||||
|
} else {
|
||||||
|
source.ReleaseTime = util.PtrToInt64(publishedAt.Unix())
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
83
local/ytapi.go
Normal file
83
local/ytapi.go
Normal file
|
@ -0,0 +1,83 @@
|
||||||
|
package local
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
type YouTubeAPI struct {
|
||||||
|
apiKey string
|
||||||
|
client *http.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewYouTubeAPI(apiKey string) (*YouTubeAPI) {
|
||||||
|
client := &http.Client {
|
||||||
|
Transport: &http.Transport{
|
||||||
|
MaxIdleConns: 10,
|
||||||
|
IdleConnTimeout: 30 * time.Second,
|
||||||
|
DisableCompression: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
api := YouTubeAPI {
|
||||||
|
apiKey: apiKey,
|
||||||
|
client: client,
|
||||||
|
}
|
||||||
|
|
||||||
|
return &api
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *YouTubeAPI) GetVideoSnippet(videoID string) (*VideoSnippet, error) {
|
||||||
|
req, err := http.NewRequest("GET", "https://youtube.googleapis.com/youtube/v3/videos", nil)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error creating http client for YouTube API: %v", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
query := req.URL.Query()
|
||||||
|
query.Add("part", "snippet")
|
||||||
|
query.Add("id", videoID)
|
||||||
|
query.Add("key", a.apiKey)
|
||||||
|
req.URL.RawQuery = query.Encode()
|
||||||
|
|
||||||
|
req.Header.Add("Accept", "application/json")
|
||||||
|
|
||||||
|
resp, err := a.client.Do(req)
|
||||||
|
defer resp.Body.Close()
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error from YouTube API: %v", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
body, err := io.ReadAll(resp.Body)
|
||||||
|
log.Tracef("Response from YouTube API: %s", string(body[:]))
|
||||||
|
|
||||||
|
var result videoListResponse
|
||||||
|
err = json.Unmarshal(body, &result)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error deserializing video list response from YouTube API: %v", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(result.Items) != 1 {
|
||||||
|
err = fmt.Errorf("YouTube API responded with incorrect number of snippets (%d) while attempting to get snippet data for video %s", len(result.Items), videoID)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &result.Items[0].Snippet, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type videoListResponse struct {
|
||||||
|
Items []struct {
|
||||||
|
Snippet VideoSnippet `json:"snippet"`
|
||||||
|
} `json:"items"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type VideoSnippet struct {
|
||||||
|
PublishedAt string `json:"publishedAt"`
|
||||||
|
}
|
239
local/ytdl.go
Normal file
239
local/ytdl.go
Normal file
|
@ -0,0 +1,239 @@
|
||||||
|
package local
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"os/exec"
|
||||||
|
"path"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/lbryio/ytsync/v5/downloader/ytdl"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Ytdl struct {
|
||||||
|
DownloadDir string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewYtdl(downloadDir string) (*Ytdl, error) {
|
||||||
|
// TODO validate download dir
|
||||||
|
|
||||||
|
y := Ytdl {
|
||||||
|
DownloadDir: downloadDir,
|
||||||
|
}
|
||||||
|
|
||||||
|
return &y, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (y *Ytdl) GetVideoMetadata(videoID string) (*ytdl.YtdlVideo, error) {
|
||||||
|
metadataPath, err := y.GetVideoMetadataFile(videoID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
metadataBytes, err := os.ReadFile(metadataPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var metadata *ytdl.YtdlVideo
|
||||||
|
err = json.Unmarshal(metadataBytes, &metadata)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return metadata, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
func (y *Ytdl) GetVideoMetadataFile(videoID string) (string, error) {
|
||||||
|
basePath := path.Join(y.DownloadDir, videoID)
|
||||||
|
metadataPath := basePath + ".info.json"
|
||||||
|
|
||||||
|
_, err := os.Stat(metadataPath)
|
||||||
|
if err != nil && !os.IsNotExist(err) {
|
||||||
|
log.Errorf("Error determining if video metadata already exists: %v", err)
|
||||||
|
return "", err
|
||||||
|
} else if err != nil {
|
||||||
|
log.Debugf("Metadata file for video %s does not exist. Downloading now.", videoID)
|
||||||
|
err = downloadVideoMetadata(basePath, videoID)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return metadataPath, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (y *Ytdl) GetVideoFile(videoID string) (string, error) {
|
||||||
|
videoPath, err := findDownloadedVideo(y.DownloadDir, videoID)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
if videoPath != nil {
|
||||||
|
return *videoPath, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
basePath := path.Join(y.DownloadDir, videoID)
|
||||||
|
metadataPath, err := y.GetVideoMetadataFile(videoID)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error getting metadata path in preparation for video download: %v", err)
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
err = downloadVideo(basePath, metadataPath)
|
||||||
|
if err != nil {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
videoPath, err = findDownloadedVideo(y.DownloadDir, videoID)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error from findDownloadedVideo() after already succeeding once: %v", err)
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
if videoPath == nil {
|
||||||
|
return "", errors.New("Could not find a downloaded video after successful download.")
|
||||||
|
}
|
||||||
|
|
||||||
|
return *videoPath, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (y *Ytdl) DeleteVideoFiles(videoID string) error {
|
||||||
|
files, err := ioutil.ReadDir(y.DownloadDir)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, f := range files {
|
||||||
|
if f.IsDir() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if strings.Contains(f.Name(), videoID) {
|
||||||
|
videoPath := path.Join(y.DownloadDir, f.Name())
|
||||||
|
err = os.Remove(videoPath)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error while deleting file %s: %v", y.DownloadDir, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func deleteFile(path string) error {
|
||||||
|
_, err := os.Stat(path)
|
||||||
|
if err != nil && !os.IsNotExist(err) {
|
||||||
|
log.Errorf("Error determining if file %s exists: %v", path, err)
|
||||||
|
return err
|
||||||
|
} else if err != nil {
|
||||||
|
log.Debugf("File %s does not exist. Skipping deletion.", path)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return os.Remove(path)
|
||||||
|
}
|
||||||
|
|
||||||
|
func findDownloadedVideo(videoDir, videoID string) (*string, error) {
|
||||||
|
files, err := ioutil.ReadDir(videoDir)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, f := range files {
|
||||||
|
if f.IsDir() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if path.Ext(f.Name()) == ".mp4" && strings.Contains(f.Name(), videoID) {
|
||||||
|
videoPath := path.Join(videoDir, f.Name())
|
||||||
|
return &videoPath, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func downloadVideoMetadata(basePath, videoID string) error {
|
||||||
|
ytdlArgs := []string{
|
||||||
|
"--skip-download",
|
||||||
|
"--write-info-json",
|
||||||
|
"--force-overwrites",
|
||||||
|
fmt.Sprintf("https://www.youtube.com/watch?v=%s", videoID),
|
||||||
|
"--cookies",
|
||||||
|
"cookies.txt",
|
||||||
|
"-o",
|
||||||
|
basePath,
|
||||||
|
}
|
||||||
|
ytdlCmd := exec.Command("yt-dlp", ytdlArgs...)
|
||||||
|
output, err := runCmd(ytdlCmd)
|
||||||
|
log.Debug(output)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func downloadVideo(basePath, metadataPath string) error {
|
||||||
|
ytdlArgs := []string{
|
||||||
|
"--no-progress",
|
||||||
|
"-o",
|
||||||
|
basePath,
|
||||||
|
"--merge-output-format",
|
||||||
|
"mp4",
|
||||||
|
"--postprocessor-args",
|
||||||
|
"ffmpeg:-movflags faststart",
|
||||||
|
"--abort-on-unavailable-fragment",
|
||||||
|
"--fragment-retries",
|
||||||
|
"1",
|
||||||
|
"--cookies",
|
||||||
|
"cookies.txt",
|
||||||
|
"--extractor-args",
|
||||||
|
"youtube:player_client=android",
|
||||||
|
"--load-info-json",
|
||||||
|
metadataPath,
|
||||||
|
"-fbestvideo[ext=mp4][vcodec!*=av01][height<=720]+bestaudio[ext!=webm][format_id!=258][format_id!=251][format_id!=256][format_id!=327]",
|
||||||
|
}
|
||||||
|
|
||||||
|
ytdlCmd := exec.Command("yt-dlp", ytdlArgs...)
|
||||||
|
output, err := runCmd(ytdlCmd)
|
||||||
|
log.Debug(output)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func runCmd(cmd *exec.Cmd) ([]string, error) {
|
||||||
|
log.Infof("running cmd: %s", strings.Join(cmd.Args, " "))
|
||||||
|
var err error
|
||||||
|
stderr, err := cmd.StderrPipe()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
stdout, err := cmd.StdoutPipe()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
err = cmd.Start()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
outLog, err := ioutil.ReadAll(stdout)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
errorLog, err := ioutil.ReadAll(stderr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
done := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
done <- cmd.Wait()
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case err := <-done:
|
||||||
|
if err != nil {
|
||||||
|
log.Error(string(errorLog))
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return strings.Split(strings.Replace(string(outLog), "\r\n", "\n", -1), "\n"), nil
|
||||||
|
}
|
||||||
|
}
|
76
local/ytdlVideoSource.go
Normal file
76
local/ytdlVideoSource.go
Normal file
|
@ -0,0 +1,76 @@
|
||||||
|
package local
|
||||||
|
|
||||||
|
import (
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/lbryio/ytsync/v5/downloader/ytdl"
|
||||||
|
)
|
||||||
|
|
||||||
|
type YtdlVideoSource struct {
|
||||||
|
downloader Ytdl
|
||||||
|
enrichers []YouTubeVideoEnricher
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewYtdlVideoSource(downloadDir string, config *YouTubeSourceConfig) (*YtdlVideoSource, error) {
|
||||||
|
ytdl, err := NewYtdl(downloadDir)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
source := YtdlVideoSource {
|
||||||
|
downloader: *ytdl,
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.YouTubeAPIKey != "" {
|
||||||
|
ytapiEnricher := NewYouTubeAPIVideoEnricher(config.YouTubeAPIKey)
|
||||||
|
source.enrichers = append(source.enrichers, ytapiEnricher)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &source, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *YtdlVideoSource) GetVideo(id string) (*SourceVideo, error) {
|
||||||
|
metadata, err := s.downloader.GetVideoMetadata(id)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
videoPath, err := s.downloader.GetVideoFile(id)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var bestThumbnail *ytdl.Thumbnail = nil
|
||||||
|
for i, thumbnail := range metadata.Thumbnails {
|
||||||
|
if i == 0 || bestThumbnail.Width < thumbnail.Width {
|
||||||
|
bestThumbnail = &thumbnail
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sourceVideo := SourceVideo {
|
||||||
|
ID: id,
|
||||||
|
Title: &metadata.Title,
|
||||||
|
Description: &metadata.Description,
|
||||||
|
SourceURL: "\nhttps://www.youtube.com/watch?v=" + id,
|
||||||
|
Languages: []string{},
|
||||||
|
Tags: metadata.Tags,
|
||||||
|
ReleaseTime: nil,
|
||||||
|
ThumbnailURL: &bestThumbnail.URL,
|
||||||
|
FullLocalPath: videoPath,
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, enricher := range s.enrichers {
|
||||||
|
err = enricher.EnrichMissing(&sourceVideo)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("Error enriching video %s, continuing enrichment: %v", id, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debugf("Source video retrieved via ytdl: %v", sourceVideo)
|
||||||
|
|
||||||
|
return &sourceVideo, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *YtdlVideoSource) DeleteLocalCache(id string) error {
|
||||||
|
return s.downloader.DeleteVideoFiles(id)
|
||||||
|
}
|
158
main.go
158
main.go
|
@ -1,172 +1,24 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
"github.com/lbryio/ytsync/v5/cmd"
|
||||||
"github.com/lbryio/lbry.go/v2/extras/util"
|
|
||||||
"github.com/lbryio/ytsync/v5/manager"
|
|
||||||
"github.com/lbryio/ytsync/v5/sdk"
|
|
||||||
"github.com/lbryio/ytsync/v5/shared"
|
|
||||||
ytUtils "github.com/lbryio/ytsync/v5/util"
|
|
||||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/spf13/cobra"
|
|
||||||
)
|
|
||||||
|
|
||||||
var Version string
|
|
||||||
|
|
||||||
const defaultMaxTries = 3
|
|
||||||
|
|
||||||
var (
|
|
||||||
cliFlags shared.SyncFlags
|
|
||||||
maxVideoLength int
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
rand.Seed(time.Now().UnixNano())
|
rand.Seed(time.Now().UnixNano())
|
||||||
log.SetLevel(log.DebugLevel)
|
log.SetLevel(log.DebugLevel)
|
||||||
http.Handle("/metrics", promhttp.Handler())
|
|
||||||
go func() {
|
go func() {
|
||||||
|
http.Handle("/metrics", promhttp.Handler())
|
||||||
log.Error(http.ListenAndServe(":2112", nil))
|
log.Error(http.ListenAndServe(":2112", nil))
|
||||||
}()
|
}()
|
||||||
cmd := &cobra.Command{
|
|
||||||
Use: "ytsync",
|
|
||||||
Short: "Publish youtube channels into LBRY network automatically.",
|
|
||||||
Run: ytSync,
|
|
||||||
Args: cobra.RangeArgs(0, 0),
|
|
||||||
}
|
|
||||||
|
|
||||||
cmd.Flags().IntVar(&cliFlags.MaxTries, "max-tries", defaultMaxTries, "Number of times to try a publish that fails")
|
cmd.Execute()
|
||||||
cmd.Flags().BoolVar(&cliFlags.TakeOverExistingChannel, "takeover-existing-channel", false, "If channel exists and we don't own it, take over the channel")
|
|
||||||
cmd.Flags().IntVar(&cliFlags.Limit, "limit", 0, "limit the amount of channels to sync")
|
|
||||||
cmd.Flags().BoolVar(&cliFlags.SkipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup")
|
|
||||||
cmd.Flags().BoolVar(&cliFlags.SyncUpdate, "update", false, "Update previously synced channels instead of syncing new ones")
|
|
||||||
cmd.Flags().BoolVar(&cliFlags.SingleRun, "run-once", false, "Whether the process should be stopped after one cycle or not")
|
|
||||||
cmd.Flags().BoolVar(&cliFlags.RemoveDBUnpublished, "remove-db-unpublished", false, "Remove videos from the database that are marked as published but aren't really published")
|
|
||||||
cmd.Flags().BoolVar(&cliFlags.UpgradeMetadata, "upgrade-metadata", false, "Upgrade videos if they're on the old metadata version")
|
|
||||||
cmd.Flags().BoolVar(&cliFlags.DisableTransfers, "no-transfers", false, "Skips the transferring process of videos, channels and supports")
|
|
||||||
cmd.Flags().BoolVar(&cliFlags.QuickSync, "quick", false, "Look up only the last 50 videos from youtube")
|
|
||||||
cmd.Flags().StringVar(&cliFlags.Status, "status", "", "Specify which queue to pull from. Overrides --update")
|
|
||||||
cmd.Flags().StringVar(&cliFlags.SecondaryStatus, "status2", "", "Specify which secondary queue to pull from.")
|
|
||||||
cmd.Flags().StringVar(&cliFlags.ChannelID, "channelID", "", "If specified, only this channel will be synced.")
|
|
||||||
cmd.Flags().Int64Var(&cliFlags.SyncFrom, "after", time.Unix(0, 0).Unix(), "Specify from when to pull jobs [Unix time](Default: 0)")
|
|
||||||
cmd.Flags().Int64Var(&cliFlags.SyncUntil, "before", time.Now().AddDate(1, 0, 0).Unix(), "Specify until when to pull jobs [Unix time](Default: current Unix time)")
|
|
||||||
cmd.Flags().IntVar(&cliFlags.ConcurrentJobs, "concurrent-jobs", 1, "how many jobs to process concurrently")
|
|
||||||
cmd.Flags().IntVar(&cliFlags.VideosLimit, "videos-limit", 0, "how many videos to process per channel (leave 0 for automatic detection)")
|
|
||||||
cmd.Flags().IntVar(&cliFlags.MaxVideoSize, "max-size", 2048, "Maximum video size to process (in MB)")
|
|
||||||
cmd.Flags().IntVar(&maxVideoLength, "max-length", 2, "Maximum video length to process (in hours)")
|
|
||||||
|
|
||||||
if err := cmd.Execute(); err != nil {
|
|
||||||
fmt.Println(err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func ytSync(cmd *cobra.Command, args []string) {
|
|
||||||
var hostname string
|
|
||||||
slackToken := os.Getenv("SLACK_TOKEN")
|
|
||||||
if slackToken == "" {
|
|
||||||
log.Error("A slack token was not present in env vars! Slack messages disabled!")
|
|
||||||
} else {
|
|
||||||
var err error
|
|
||||||
hostname, err = os.Hostname()
|
|
||||||
if err != nil {
|
|
||||||
log.Error("could not detect system hostname")
|
|
||||||
hostname = "ytsync-unknown"
|
|
||||||
}
|
|
||||||
if len(hostname) > 30 {
|
|
||||||
hostname = hostname[0:30]
|
|
||||||
}
|
|
||||||
|
|
||||||
util.InitSlack(os.Getenv("SLACK_TOKEN"), os.Getenv("SLACK_CHANNEL"), hostname)
|
|
||||||
}
|
|
||||||
|
|
||||||
if cliFlags.Status != "" && !util.InSlice(cliFlags.Status, shared.SyncStatuses) {
|
|
||||||
log.Errorf("status must be one of the following: %v\n", shared.SyncStatuses)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if cliFlags.MaxTries < 1 {
|
|
||||||
log.Errorln("setting --max-tries less than 1 doesn't make sense")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if cliFlags.Limit < 0 {
|
|
||||||
log.Errorln("setting --limit less than 0 (unlimited) doesn't make sense")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
cliFlags.MaxVideoLength = time.Duration(maxVideoLength) * time.Hour
|
|
||||||
|
|
||||||
apiURL := os.Getenv("LBRY_WEB_API")
|
|
||||||
apiToken := os.Getenv("LBRY_API_TOKEN")
|
|
||||||
youtubeAPIKey := os.Getenv("YOUTUBE_API_KEY")
|
|
||||||
lbrycrdDsn := os.Getenv("LBRYCRD_STRING")
|
|
||||||
awsS3ID := os.Getenv("AWS_S3_ID")
|
|
||||||
awsS3Secret := os.Getenv("AWS_S3_SECRET")
|
|
||||||
awsS3Region := os.Getenv("AWS_S3_REGION")
|
|
||||||
awsS3Bucket := os.Getenv("AWS_S3_BUCKET")
|
|
||||||
if apiURL == "" {
|
|
||||||
log.Errorln("An API URL was not defined. Please set the environment variable LBRY_WEB_API")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if apiToken == "" {
|
|
||||||
log.Errorln("An API Token was not defined. Please set the environment variable LBRY_API_TOKEN")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if youtubeAPIKey == "" {
|
|
||||||
log.Errorln("A Youtube API key was not defined. Please set the environment variable YOUTUBE_API_KEY")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if awsS3ID == "" {
|
|
||||||
log.Errorln("AWS S3 ID credentials were not defined. Please set the environment variable AWS_S3_ID")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if awsS3Secret == "" {
|
|
||||||
log.Errorln("AWS S3 Secret credentials were not defined. Please set the environment variable AWS_S3_SECRET")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if awsS3Region == "" {
|
|
||||||
log.Errorln("AWS S3 Region was not defined. Please set the environment variable AWS_S3_REGION")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if awsS3Bucket == "" {
|
|
||||||
log.Errorln("AWS S3 Bucket was not defined. Please set the environment variable AWS_S3_BUCKET")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if lbrycrdDsn == "" {
|
|
||||||
log.Infoln("Using default (local) lbrycrd instance. Set LBRYCRD_STRING if you want to use something else")
|
|
||||||
}
|
|
||||||
|
|
||||||
blobsDir := ytUtils.GetBlobsDir()
|
|
||||||
|
|
||||||
apiConfig := &sdk.APIConfig{
|
|
||||||
YoutubeAPIKey: youtubeAPIKey,
|
|
||||||
ApiURL: apiURL,
|
|
||||||
ApiToken: apiToken,
|
|
||||||
HostName: hostname,
|
|
||||||
}
|
|
||||||
awsConfig := &shared.AwsConfigs{
|
|
||||||
AwsS3ID: awsS3ID,
|
|
||||||
AwsS3Secret: awsS3Secret,
|
|
||||||
AwsS3Region: awsS3Region,
|
|
||||||
AwsS3Bucket: awsS3Bucket,
|
|
||||||
}
|
|
||||||
sm := manager.NewSyncManager(
|
|
||||||
cliFlags,
|
|
||||||
blobsDir,
|
|
||||||
lbrycrdDsn,
|
|
||||||
awsConfig,
|
|
||||||
apiConfig,
|
|
||||||
)
|
|
||||||
err := sm.Start()
|
|
||||||
if err != nil {
|
|
||||||
ytUtils.SendErrorToSlack(errors.FullTrace(err))
|
|
||||||
}
|
|
||||||
ytUtils.SendInfoToSlack("Syncing process terminated!")
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,9 +58,12 @@ func (s *SyncManager) Start() error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var lastChannelProcessed string
|
var (
|
||||||
var secondLastChannelProcessed string
|
lastChannelProcessed string
|
||||||
syncCount := 0
|
secondLastChannelProcessed string
|
||||||
|
syncCount int
|
||||||
|
)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
s.channelsToSync = make([]Sync, 0, 10) // reset sync queue
|
s.channelsToSync = make([]Sync, 0, 10) // reset sync queue
|
||||||
err := s.checkUsedSpace()
|
err := s.checkUsedSpace()
|
||||||
|
@ -108,10 +111,12 @@ func (s *SyncManager) Start() error {
|
||||||
log.Infof("Drained the \"%s\" queue", q)
|
log.Infof("Drained the \"%s\" queue", q)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(s.channelsToSync) == 0 {
|
if len(s.channelsToSync) == 0 {
|
||||||
log.Infoln("No channels to sync. Pausing 5 minutes!")
|
log.Infoln("No channels to sync. Pausing 5 minutes!")
|
||||||
time.Sleep(5 * time.Minute)
|
time.Sleep(5 * time.Minute)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, sync := range s.channelsToSync {
|
for _, sync := range s.channelsToSync {
|
||||||
if lastChannelProcessed == sync.DbChannelData.ChannelId && secondLastChannelProcessed == lastChannelProcessed {
|
if lastChannelProcessed == sync.DbChannelData.ChannelId && secondLastChannelProcessed == lastChannelProcessed {
|
||||||
util.SendToSlack("We just killed a sync for %s to stop looping! (%s)", sync.DbChannelData.DesiredChannelName, sync.DbChannelData.ChannelId)
|
util.SendToSlack("We just killed a sync for %s to stop looping! (%s)", sync.DbChannelData.DesiredChannelName, sync.DbChannelData.ChannelId)
|
||||||
|
@ -174,6 +179,7 @@ func (s *SyncManager) Start() error {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue