Compare commits

...

8 commits

Author SHA1 Message Date
pseudoscalar
ce901f6b01 Add option to not wait for reflection. Add instructions for getting a YouTube API key. Fix some minor issues. 2021-11-17 10:07:20 -06:00
pseudoscalar
9f6b15e841 Clean up local cache after publishing stream 2021-11-05 11:22:20 -05:00
pseudoscalar
e564dc8445 Add dry-run option 2021-11-05 10:09:59 -05:00
pseudoscalar
eb30fa4299 Determine release time via YouTube API 2021-11-05 09:39:43 -05:00
pseudoscalar
2ba960ae01 Refactor to support future direction of development 2021-11-05 09:39:43 -05:00
pseudoscalar
8ea15afce8 Basic stream publishing. Still needs some work. 2021-11-05 09:39:43 -05:00
pseudoscalar
b9bf2f6e73 Download video to sync to local cache 2021-11-05 09:39:43 -05:00
Alex Grintsvayg
e554bbfe18
started work on local ytsync 2021-10-25 10:45:39 -04:00
12 changed files with 1077 additions and 267 deletions

View file

@ -1,111 +0,0 @@
From 30380338ba9af01696c94b61f0597131638eaec1 Mon Sep 17 00:00:00 2001
From: Niko Storni <niko@lbry.io>
Date: Mon, 16 Dec 2019 00:13:36 +0100
Subject: [PATCH] lbry-patch
---
youtube_dl/extractor/youtube.py | 45 +++++++++++++++++++++++++--------
1 file changed, 35 insertions(+), 10 deletions(-)
diff --git a/youtube_dl/extractor/youtube.py b/youtube_dl/extractor/youtube.py
index b913d07a6..cd66a5b01 100644
--- a/youtube_dl/extractor/youtube.py
+++ b/youtube_dl/extractor/youtube.py
@@ -10,6 +10,7 @@ import random
import re
import time
import traceback
+import subprocess
from .common import InfoExtractor, SearchInfoExtractor
from ..jsinterp import JSInterpreter
@@ -536,6 +537,9 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
_GEO_BYPASS = False
+ _WGET_429_RATE_LIMIT = 8191
+ _WGET_BINARY = "wget"
+
IE_NAME = 'youtube'
_TESTS = [
{
@@ -1254,6 +1258,17 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
""" Return a string representation of a signature """
return '.'.join(compat_str(len(part)) for part in example_sig.split('.'))
+ def _rate_limit_download(self, url, video_id, note=None):
+ if note is None:
+ self.report_download_webpage(video_id)
+ elif note is not False:
+ if video_id is None:
+ self.to_screen('%s' % (note,))
+ else:
+ self.to_screen('%s: %s' % (video_id, note))
+ source_address = self._downloader.params.get('source_address')
+ return subprocess.run([self._WGET_BINARY, '-q', '--limit-rate', str(self._WGET_429_RATE_LIMIT), '--bind-address', source_address, '-O', '-', url], check=True, stdout=subprocess.PIPE).stdout.decode(encoding='UTF-8')
+
def _extract_signature_function(self, video_id, player_url, example_sig):
id_m = re.match(
r'.*?-(?P<id>[a-zA-Z0-9_-]+)(?:/watch_as3|/html5player(?:-new)?|(?:/[a-z]{2,3}_[A-Z]{2})?/base)?\.(?P<ext>[a-z]+)$',
@@ -1678,7 +1693,7 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
# Get video webpage
url = proto + '://www.youtube.com/watch?v=%s&gl=US&hl=en&has_verified=1&bpctr=9999999999' % video_id
- video_webpage = self._download_webpage(url, video_id)
+ video_webpage = self._rate_limit_download(url, video_id)
# Attempt to extract SWF player URL
mobj = re.search(r'swfConfig.*?"(https?:\\/\\/.*?watch.*?-.*?\.swf)"', video_webpage)
@@ -1736,10 +1751,9 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
r'"sts"\s*:\s*(\d+)', embed_webpage, 'sts', default=''),
})
video_info_url = proto + '://www.youtube.com/get_video_info?' + data
- video_info_webpage = self._download_webpage(
+ video_info_webpage = self._rate_limit_download(
video_info_url, video_id,
- note='Refetching age-gated info webpage',
- errnote='unable to download video info webpage')
+ note='Refetching age-gated info webpage')
video_info = compat_parse_qs(video_info_webpage)
pl_response = video_info.get('player_response', [None])[0]
player_response = extract_player_response(pl_response, video_id)
@@ -1777,7 +1791,7 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
# The general idea is to take a union of itags of both DASH manifests (for example
# video with such 'manifest behavior' see https://github.com/ytdl-org/youtube-dl/issues/6093)
self.report_video_info_webpage_download(video_id)
- for el in ('embedded', 'detailpage', 'vevo', ''):
+ for el in ('', 'embedded', 'detailpage', 'vevo'):
query = {
'video_id': video_id,
'ps': 'default',
@@ -1789,11 +1803,22 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
query['el'] = el
if sts:
query['sts'] = sts
- video_info_webpage = self._download_webpage(
- '%s://www.youtube.com/get_video_info' % proto,
- video_id, note=False,
- errnote='unable to download video info webpage',
- fatal=False, query=query)
+
+ if el == '':
+ base_url = 'https://youtube.com/get_video_info?video_id={}'.format(video_id)
+ else:
+ base_url = 'https://youtube.com/get_video_info'
+
+ for q in query:
+ if q is None or q is "":
+ continue
+ if query[q] is None or query[q] is "":
+ continue
+
+ base_url = base_url + "?{}={}".format(q, query[q])
+
+ video_info_webpage = self._rate_limit_download(base_url, video_id)
+
if not video_info_webpage:
continue
get_video_info = compat_parse_qs(video_info_webpage)
--
2.17.1

7
cmd/local.go Normal file
View file

@ -0,0 +1,7 @@
package cmd
import "github.com/lbryio/ytsync/v5/local"
func init() {
local.AddCommand(rootCmd)
}

166
cmd/root.go Normal file
View file

@ -0,0 +1,166 @@
package cmd
import (
"os"
"time"
"github.com/lbryio/ytsync/v5/manager"
"github.com/lbryio/ytsync/v5/sdk"
"github.com/lbryio/ytsync/v5/shared"
ytUtils "github.com/lbryio/ytsync/v5/util"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/util"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
const defaultMaxTries = 3
var (
cliFlags shared.SyncFlags
maxVideoLength int
)
var rootCmd = &cobra.Command{
Use: "ytsync",
Short: "Publish youtube channels into LBRY network automatically.",
Run: ytSync,
Args: cobra.RangeArgs(0, 0),
}
func init() {
rootCmd.Flags().IntVar(&cliFlags.MaxTries, "max-tries", defaultMaxTries, "Number of times to try a publish that fails")
rootCmd.Flags().BoolVar(&cliFlags.TakeOverExistingChannel, "takeover-existing-channel", false, "If channel exists and we don't own it, take over the channel")
rootCmd.Flags().IntVar(&cliFlags.Limit, "limit", 0, "limit the amount of channels to sync")
rootCmd.Flags().BoolVar(&cliFlags.SkipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup")
rootCmd.Flags().BoolVar(&cliFlags.SyncUpdate, "update", false, "Update previously synced channels instead of syncing new ones")
rootCmd.Flags().BoolVar(&cliFlags.SingleRun, "run-once", false, "Whether the process should be stopped after one cycle or not")
rootCmd.Flags().BoolVar(&cliFlags.RemoveDBUnpublished, "remove-db-unpublished", false, "Remove videos from the database that are marked as published but aren't really published")
rootCmd.Flags().BoolVar(&cliFlags.UpgradeMetadata, "upgrade-metadata", false, "Upgrade videos if they're on the old metadata version")
rootCmd.Flags().BoolVar(&cliFlags.DisableTransfers, "no-transfers", false, "Skips the transferring process of videos, channels and supports")
rootCmd.Flags().BoolVar(&cliFlags.QuickSync, "quick", false, "Look up only the last 50 videos from youtube")
rootCmd.Flags().StringVar(&cliFlags.Status, "status", "", "Specify which queue to pull from. Overrides --update")
rootCmd.Flags().StringVar(&cliFlags.SecondaryStatus, "status2", "", "Specify which secondary queue to pull from.")
rootCmd.Flags().StringVar(&cliFlags.ChannelID, "channelID", "", "If specified, only this channel will be synced.")
rootCmd.Flags().Int64Var(&cliFlags.SyncFrom, "after", time.Unix(0, 0).Unix(), "Specify from when to pull jobs [Unix time](Default: 0)")
rootCmd.Flags().Int64Var(&cliFlags.SyncUntil, "before", time.Now().AddDate(1, 0, 0).Unix(), "Specify until when to pull jobs [Unix time](Default: current Unix time)")
rootCmd.Flags().IntVar(&cliFlags.ConcurrentJobs, "concurrent-jobs", 1, "how many jobs to process concurrently")
rootCmd.Flags().IntVar(&cliFlags.VideosLimit, "videos-limit", 0, "how many videos to process per channel (leave 0 for automatic detection)")
rootCmd.Flags().IntVar(&cliFlags.MaxVideoSize, "max-size", 2048, "Maximum video size to process (in MB)")
rootCmd.Flags().IntVar(&maxVideoLength, "max-length", 2, "Maximum video length to process (in hours)")
}
func Execute() {
err := rootCmd.Execute()
if err != nil {
log.Errorln(err)
os.Exit(1)
}
}
func ytSync(cmd *cobra.Command, args []string) {
var hostname string
slackToken := os.Getenv("SLACK_TOKEN")
if slackToken == "" {
log.Error("A slack token was not present in env vars! Slack messages disabled!")
} else {
var err error
hostname, err = os.Hostname()
if err != nil {
log.Error("could not detect system hostname")
hostname = "ytsync-unknown"
}
if len(hostname) > 30 {
hostname = hostname[0:30]
}
util.InitSlack(os.Getenv("SLACK_TOKEN"), os.Getenv("SLACK_CHANNEL"), hostname)
}
if cliFlags.Status != "" && !util.InSlice(cliFlags.Status, shared.SyncStatuses) {
log.Errorf("status must be one of the following: %v\n", shared.SyncStatuses)
return
}
if cliFlags.MaxTries < 1 {
log.Errorln("setting --max-tries less than 1 doesn't make sense")
return
}
if cliFlags.Limit < 0 {
log.Errorln("setting --limit less than 0 (unlimited) doesn't make sense")
return
}
cliFlags.MaxVideoLength = time.Duration(maxVideoLength) * time.Hour
apiURL := os.Getenv("LBRY_WEB_API")
apiToken := os.Getenv("LBRY_API_TOKEN")
youtubeAPIKey := os.Getenv("YOUTUBE_API_KEY")
lbrycrdDsn := os.Getenv("LBRYCRD_STRING")
awsS3ID := os.Getenv("AWS_S3_ID")
awsS3Secret := os.Getenv("AWS_S3_SECRET")
awsS3Region := os.Getenv("AWS_S3_REGION")
awsS3Bucket := os.Getenv("AWS_S3_BUCKET")
if apiURL == "" {
log.Errorln("An API URL was not defined. Please set the environment variable LBRY_WEB_API")
return
}
if apiToken == "" {
log.Errorln("An API Token was not defined. Please set the environment variable LBRY_API_TOKEN")
return
}
if youtubeAPIKey == "" {
log.Errorln("A Youtube API key was not defined. Please set the environment variable YOUTUBE_API_KEY")
return
}
if awsS3ID == "" {
log.Errorln("AWS S3 ID credentials were not defined. Please set the environment variable AWS_S3_ID")
return
}
if awsS3Secret == "" {
log.Errorln("AWS S3 Secret credentials were not defined. Please set the environment variable AWS_S3_SECRET")
return
}
if awsS3Region == "" {
log.Errorln("AWS S3 Region was not defined. Please set the environment variable AWS_S3_REGION")
return
}
if awsS3Bucket == "" {
log.Errorln("AWS S3 Bucket was not defined. Please set the environment variable AWS_S3_BUCKET")
return
}
if lbrycrdDsn == "" {
log.Infoln("Using default (local) lbrycrd instance. Set LBRYCRD_STRING if you want to use something else")
}
blobsDir := ytUtils.GetBlobsDir()
apiConfig := &sdk.APIConfig{
YoutubeAPIKey: youtubeAPIKey,
ApiURL: apiURL,
ApiToken: apiToken,
HostName: hostname,
}
awsConfig := &shared.AwsConfigs{
AwsS3ID: awsS3ID,
AwsS3Secret: awsS3Secret,
AwsS3Region: awsS3Region,
AwsS3Bucket: awsS3Bucket,
}
sm := manager.NewSyncManager(
cliFlags,
blobsDir,
lbrycrdDsn,
awsConfig,
apiConfig,
)
err := sm.Start()
if err != nil {
ytUtils.SendErrorToSlack(errors.FullTrace(err))
}
ytUtils.SendInfoToSlack("Syncing process terminated!")
}

253
local/local.go Normal file
View file

@ -0,0 +1,253 @@
package local
import (
"errors"
"os"
"regexp"
"strings"
"time"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/abadojack/whatlanggo"
"github.com/lbryio/lbry.go/v2/extras/util"
"github.com/lbryio/ytsync/v5/namer"
"github.com/lbryio/ytsync/v5/tags_manager"
)
type SyncContext struct {
DryRun bool
KeepCache bool
ReflectStreams bool
TempDir string
LbrynetAddr string
ChannelID string
PublishBid float64
YouTubeSourceConfig *YouTubeSourceConfig
}
func (c *SyncContext) Validate() error {
if c.TempDir == "" {
return errors.New("No TempDir provided")
}
if c.LbrynetAddr == "" {
return errors.New("No Lbrynet address provided")
}
if c.ChannelID == "" {
return errors.New("No channel ID provided")
}
if c.PublishBid <= 0.0 {
return errors.New("Publish bid is not greater than zero")
}
return nil
}
type YouTubeSourceConfig struct {
YouTubeAPIKey string
}
var syncContext SyncContext
func AddCommand(rootCmd *cobra.Command) {
cmd := &cobra.Command{
Use: "local",
Short: "run a personal ytsync",
Run: localCmd,
Args: cobra.ExactArgs(1),
}
cmd.Flags().BoolVar(&syncContext.DryRun, "dry-run", false, "Display information about the stream publishing, but do not publish the stream")
cmd.Flags().BoolVar(&syncContext.KeepCache, "keep-cache", false, "Don't delete local files after publishing.")
cmd.Flags().BoolVar(&syncContext.ReflectStreams, "reflect-streams", true, "Require published streams to be reflected.")
cmd.Flags().StringVar(&syncContext.TempDir, "temp-dir", getEnvDefault("TEMP_DIR", ""), "directory to use for temporary files")
cmd.Flags().Float64Var(&syncContext.PublishBid, "publish-bid", 0.01, "Bid amount for the stream claim")
cmd.Flags().StringVar(&syncContext.LbrynetAddr, "lbrynet-address", getEnvDefault("LBRYNET_ADDRESS", ""), "JSONRPC address of the local LBRYNet daemon")
cmd.Flags().StringVar(&syncContext.ChannelID, "channel-id", "", "LBRY channel ID to publish to")
// For now, assume source is always YouTube
syncContext.YouTubeSourceConfig = &YouTubeSourceConfig{}
cmd.Flags().StringVar(&syncContext.YouTubeSourceConfig.YouTubeAPIKey, "youtube-api-key", getEnvDefault("YOUTUBE_API_KEY", ""), "YouTube API Key")
rootCmd.AddCommand(cmd)
}
func getEnvDefault(key, defaultValue string) string {
if value, ok := os.LookupEnv(key); ok {
return value
}
return defaultValue
}
func localCmd(cmd *cobra.Command, args []string) {
err := syncContext.Validate()
if err != nil {
log.Error(err)
return
}
videoID := args[0]
log.Debugf("Running sync for video ID %s", videoID)
var publisher VideoPublisher
publisher, err = NewLocalSDKPublisher(syncContext.LbrynetAddr, syncContext.ChannelID, syncContext.PublishBid)
if err != nil {
log.Errorf("Error setting up publisher: %v", err)
return
}
var videoSource VideoSource
if syncContext.YouTubeSourceConfig != nil {
videoSource, err = NewYtdlVideoSource(syncContext.TempDir, syncContext.YouTubeSourceConfig)
if err != nil {
log.Errorf("Error setting up video source: %v", err)
return
}
}
sourceVideo, err := videoSource.GetVideo(videoID)
if err != nil {
log.Errorf("Error getting source video: %v", err)
return
}
processedVideo, err := processVideoForPublishing(*sourceVideo, syncContext.ChannelID)
if err != nil {
log.Errorf("Error processing source video for publishing: %v", err)
return
}
if syncContext.DryRun {
log.Infoln("This is a dry run. Nothing will be published.")
log.Infof("The local file %s would be published to channel ID %s as %s.", processedVideo.FullLocalPath, syncContext.ChannelID, processedVideo.ClaimName)
log.Debugf("Object to be published: %v", processedVideo)
} else {
doneReflectingCh, err := publisher.Publish(*processedVideo, syncContext.ReflectStreams)
if err != nil {
log.Errorf("Error publishing video: %v", err)
return
}
if syncContext.ReflectStreams {
err = <-doneReflectingCh
if err != nil {
log.Errorf("Error while wating for stream to reflect: %v", err)
}
} else {
log.Debugln("Not waiting for stream to reflect.")
}
}
if !syncContext.KeepCache {
log.Infof("Deleting local files.")
err = videoSource.DeleteLocalCache(videoID)
if err != nil {
log.Errorf("Error deleting local files for video %s: %v", videoID, err)
}
}
log.Info("Done")
}
type SourceVideo struct {
ID string
Title *string
Description *string
SourceURL string
Languages []string
Tags []string
ReleaseTime *int64
ThumbnailURL *string
FullLocalPath string
}
type PublishableVideo struct {
ID string
ClaimName string
Title string
Description string
SourceURL string
Languages []string
Tags []string
ReleaseTime int64
ThumbnailURL string
FullLocalPath string
}
func processVideoForPublishing(source SourceVideo, channelID string) (*PublishableVideo, error) {
tags, err := tags_manager.SanitizeTags(source.Tags, channelID)
if err != nil {
log.Errorf("Error sanitizing tags: %v", err)
return nil, err
}
descriptionSample := ""
if source.Description != nil {
urlsRegex := regexp.MustCompile(`(?m) ?(f|ht)(tp)(s?)(://)(.*)[.|/](.*)`)
descriptionSample = urlsRegex.ReplaceAllString(*source.Description, "")
}
info := whatlanggo.Detect(descriptionSample)
title := ""
if source.Title != nil {
title = *source.Title
}
info2 := whatlanggo.Detect(title)
var languages []string = nil
if info.IsReliable() && info.Lang.Iso6391() != "" {
language := info.Lang.Iso6391()
languages = []string{language}
} else if info2.IsReliable() && info2.Lang.Iso6391() != "" {
language := info2.Lang.Iso6391()
languages = []string{language}
}
claimName := namer.NewNamer().GetNextName(title)
thumbnailURL := source.ThumbnailURL
if thumbnailURL == nil {
thumbnailURL = util.PtrToString("")
}
releaseTime := source.ReleaseTime
if releaseTime == nil {
releaseTime = util.PtrToInt64(time.Now().Unix())
}
processed := PublishableVideo {
ClaimName: claimName,
Title: title,
Description: getAbbrevDescription(source),
Languages: languages,
Tags: tags,
ReleaseTime: *releaseTime,
ThumbnailURL: *thumbnailURL,
FullLocalPath: source.FullLocalPath,
}
log.Debugf("Video prepared for publication: %v", processed)
return &processed, nil
}
func getAbbrevDescription(v SourceVideo) string {
if v.Description == nil {
return v.SourceURL
}
additionalDescription := "\n...\n" + v.SourceURL
maxLength := 2800 - len(additionalDescription)
description := strings.TrimSpace(*v.Description)
if len(description) > maxLength {
description = description[:maxLength]
}
return description + additionalDescription
}
type VideoSource interface {
GetVideo(id string) (*SourceVideo, error)
DeleteLocalCache(id string) error
}
type VideoPublisher interface {
Publish(video PublishableVideo, reflectStream bool) (chan error, error)
}

125
local/localSDKPublisher.go Normal file
View file

@ -0,0 +1,125 @@
package local
import (
"errors"
"sort"
"time"
log "github.com/sirupsen/logrus"
"github.com/lbryio/lbry.go/v2/extras/jsonrpc"
"github.com/lbryio/lbry.go/v2/extras/util"
)
type LocalSDKPublisher struct {
channelID string
publishBid float64
lbrynet *jsonrpc.Client
}
func NewLocalSDKPublisher(sdkAddr, channelID string, publishBid float64) (*LocalSDKPublisher, error) {
lbrynet := jsonrpc.NewClient(sdkAddr)
lbrynet.SetRPCTimeout(5 * time.Minute)
status, err := lbrynet.Status()
if err != nil {
return nil, err
}
if !status.IsRunning {
return nil, errors.New("SDK is not running")
}
// Should check to see if the SDK owns the channel
// Should check to see if wallet is unlocked
// but jsonrpc.Client doesn't have WalletStatus method
// so skip for now
// Should check to see if streams are configured to be reflected and warn if not
// but jsonrpc.Client doesn't have SettingsGet method to see if streams are reflected
// so use File.UploadingToReflector as a proxy for now
publisher := LocalSDKPublisher {
channelID: channelID,
publishBid: publishBid,
lbrynet: lbrynet,
}
return &publisher, nil
}
func (p *LocalSDKPublisher) Publish(video PublishableVideo, reflectStream bool) (chan error, error) {
streamCreateOptions := jsonrpc.StreamCreateOptions {
ClaimCreateOptions: jsonrpc.ClaimCreateOptions {
Title: &video.Title,
Description: &video.Description,
Languages: video.Languages,
ThumbnailURL: &video.ThumbnailURL,
Tags: video.Tags,
},
ReleaseTime: &video.ReleaseTime,
ChannelID: &p.channelID,
License: util.PtrToString("Copyrighted (contact publisher)"),
}
txSummary, err := p.lbrynet.StreamCreate(video.ClaimName, video.FullLocalPath, p.publishBid, streamCreateOptions)
if err != nil {
return nil, err
}
if !reflectStream {
return nil, nil
}
done := make(chan error, 1)
go func() {
for {
fileListResponse, fileIndex, err := findFileByTxid(p.lbrynet, txSummary.Txid)
if err != nil {
log.Errorf("Error finding file by txid: %v", err)
done <- err
return
}
if fileListResponse == nil {
log.Errorf("Could not find file in list with correct txid")
done <- err
return
}
fileStatus := fileListResponse.Items[fileIndex]
if fileStatus.IsFullyReflected {
log.Info("Stream is fully reflected")
break
}
if !fileStatus.UploadingToReflector {
log.Error("Stream is not being uploaded to a reflector. Check your lbrynet settings if this is a mistake.")
done <- errors.New("Stream is not being reflected (check lbrynet settings).")
return
}
log.Infof("Stream reflector progress: %d%%", fileStatus.ReflectorProgress)
time.Sleep(5 * time.Second)
}
done <- nil
}()
return done, nil
}
// if jsonrpc.Client.FileList is extended to match the actual jsonrpc schema, this can be removed
func findFileByTxid(client *jsonrpc.Client, txid string) (*jsonrpc.FileListResponse, int, error) {
response, err := client.FileList(0, 20)
for {
if err != nil {
log.Errorf("Error getting file list page: %v", err)
return nil, 0, err
}
index := sort.Search(len(response.Items), func (i int) bool { return response.Items[i].Txid == txid })
if index < len(response.Items) {
return response, index, nil
}
if response.Page >= response.TotalPages {
return nil, 0, nil
}
response, err = client.FileList(response.Page + 1, 20)
}
}

69
local/readme.md Normal file
View file

@ -0,0 +1,69 @@
# Running ytsync locally
## Requirements
- LBRY SDK (what do we actually need this for?)
- youtube-dl
- enough space to cache stuff
- YouTube data API key
## Process
### Ensuring requirements are met
- claim channel if there isn't one yet
- or easier, just error if no channel
- enough lbc in wallet?
### Getting a YouTube API key
To access the YouTube data API, you will first need some kind of google account.
The API has two methods of authentication, OAuth2 and API keys. This application uses API keys.
These API keys are basically like passwords, and so once obtained, they should not be shared.
The instructions for obtaining an API key are copied below from [here](https://developers.google.com/youtube/registering_an_application):
1. Open the [Credentials page](https://console.developers.google.com/apis/credentials) in the API Console.
2. Create an API key in the Console by clicking **Create credentials > API key**. You can restrict the key before using it in production by clicking **Restrict key** and selecting one of the **Restrictions**.
To keep your API keys secure, follow the [best practices for securely using API keys](https://cloud.google.com/docs/authentication/api-keys).
### Options to figure out what's already synced
- simplest: assume nothing is synced yet
- assume everything before some video is synced
- get/put sync info from Odysee by proving you have private key for channel
- tag videos as having been synced from youtube so we can ensure accuracy
- hardest: scan channel and try to match up which videos are not synced yet
### Central DB
- prove you have a channel's private key to get info about that channel
- proper queue instead of sleeping for N minutes between syncs
### Syncing a single video
- downloading it
- thumbnails
- metadata
- having enough LBC for publish(es)
- automated error handling
- getting a human involved for errors that can't be handled automatically
- reflecting
### Continuous Sync
- running in background
- storing local state
- interactions with our central ytsync db
- dealing with yt throttling
### Debugging
- dry-running the whole thing

45
local/youtubeEnricher.go Normal file
View file

@ -0,0 +1,45 @@
package local
import (
"time"
log "github.com/sirupsen/logrus"
"github.com/lbryio/lbry.go/v2/extras/util"
)
type YouTubeVideoEnricher interface {
EnrichMissing(source *SourceVideo) error
}
type YouTubeAPIVideoEnricher struct {
api *YouTubeAPI
}
func NewYouTubeAPIVideoEnricher(apiKey string) (*YouTubeAPIVideoEnricher) {
enricher := YouTubeAPIVideoEnricher{
api: NewYouTubeAPI(apiKey),
}
return &enricher
}
func (e *YouTubeAPIVideoEnricher) EnrichMissing(source *SourceVideo) error {
if source.ReleaseTime != nil {
log.Debugf("Video %s does not need enrichment. YouTubeAPIVideoEnricher is skipping.", source.ID)
return nil
}
snippet, err := e.api.GetVideoSnippet(source.ID)
if err != nil {
log.Errorf("Error snippet data for video %s: %v", err)
return err
}
publishedAt, err := time.Parse(time.RFC3339, snippet.PublishedAt)
if err != nil {
log.Errorf("Error converting publishedAt to timestamp: %v", err)
} else {
source.ReleaseTime = util.PtrToInt64(publishedAt.Unix())
}
return nil
}

83
local/ytapi.go Normal file
View file

@ -0,0 +1,83 @@
package local
import (
"encoding/json"
"fmt"
"io"
"net/http"
"time"
log "github.com/sirupsen/logrus"
)
type YouTubeAPI struct {
apiKey string
client *http.Client
}
func NewYouTubeAPI(apiKey string) (*YouTubeAPI) {
client := &http.Client {
Transport: &http.Transport{
MaxIdleConns: 10,
IdleConnTimeout: 30 * time.Second,
DisableCompression: true,
},
}
api := YouTubeAPI {
apiKey: apiKey,
client: client,
}
return &api
}
func (a *YouTubeAPI) GetVideoSnippet(videoID string) (*VideoSnippet, error) {
req, err := http.NewRequest("GET", "https://youtube.googleapis.com/youtube/v3/videos", nil)
if err != nil {
log.Errorf("Error creating http client for YouTube API: %v", err)
return nil, err
}
query := req.URL.Query()
query.Add("part", "snippet")
query.Add("id", videoID)
query.Add("key", a.apiKey)
req.URL.RawQuery = query.Encode()
req.Header.Add("Accept", "application/json")
resp, err := a.client.Do(req)
defer resp.Body.Close()
if err != nil {
log.Errorf("Error from YouTube API: %v", err)
return nil, err
}
body, err := io.ReadAll(resp.Body)
log.Tracef("Response from YouTube API: %s", string(body[:]))
var result videoListResponse
err = json.Unmarshal(body, &result)
if err != nil {
log.Errorf("Error deserializing video list response from YouTube API: %v", err)
return nil, err
}
if len(result.Items) != 1 {
err = fmt.Errorf("YouTube API responded with incorrect number of snippets (%d) while attempting to get snippet data for video %s", len(result.Items), videoID)
return nil, err
}
return &result.Items[0].Snippet, nil
}
type videoListResponse struct {
Items []struct {
Snippet VideoSnippet `json:"snippet"`
} `json:"items"`
}
type VideoSnippet struct {
PublishedAt string `json:"publishedAt"`
}

239
local/ytdl.go Normal file
View file

@ -0,0 +1,239 @@
package local
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"os"
"os/exec"
"path"
"strings"
log "github.com/sirupsen/logrus"
"github.com/lbryio/ytsync/v5/downloader/ytdl"
)
type Ytdl struct {
DownloadDir string
}
func NewYtdl(downloadDir string) (*Ytdl, error) {
// TODO validate download dir
y := Ytdl {
DownloadDir: downloadDir,
}
return &y, nil
}
func (y *Ytdl) GetVideoMetadata(videoID string) (*ytdl.YtdlVideo, error) {
metadataPath, err := y.GetVideoMetadataFile(videoID)
if err != nil {
return nil, err
}
metadataBytes, err := os.ReadFile(metadataPath)
if err != nil {
return nil, err
}
var metadata *ytdl.YtdlVideo
err = json.Unmarshal(metadataBytes, &metadata)
if err != nil {
return nil, err
}
return metadata, nil
}
func (y *Ytdl) GetVideoMetadataFile(videoID string) (string, error) {
basePath := path.Join(y.DownloadDir, videoID)
metadataPath := basePath + ".info.json"
_, err := os.Stat(metadataPath)
if err != nil && !os.IsNotExist(err) {
log.Errorf("Error determining if video metadata already exists: %v", err)
return "", err
} else if err != nil {
log.Debugf("Metadata file for video %s does not exist. Downloading now.", videoID)
err = downloadVideoMetadata(basePath, videoID)
if err != nil {
return "", err
}
}
return metadataPath, nil
}
func (y *Ytdl) GetVideoFile(videoID string) (string, error) {
videoPath, err := findDownloadedVideo(y.DownloadDir, videoID)
if err != nil {
return "", err
}
if videoPath != nil {
return *videoPath, nil
}
basePath := path.Join(y.DownloadDir, videoID)
metadataPath, err := y.GetVideoMetadataFile(videoID)
if err != nil {
log.Errorf("Error getting metadata path in preparation for video download: %v", err)
return "", err
}
err = downloadVideo(basePath, metadataPath)
if err != nil {
return "", nil
}
videoPath, err = findDownloadedVideo(y.DownloadDir, videoID)
if err != nil {
log.Errorf("Error from findDownloadedVideo() after already succeeding once: %v", err)
return "", err
}
if videoPath == nil {
return "", errors.New("Could not find a downloaded video after successful download.")
}
return *videoPath, nil
}
func (y *Ytdl) DeleteVideoFiles(videoID string) error {
files, err := ioutil.ReadDir(y.DownloadDir)
if err != nil {
return err
}
for _, f := range files {
if f.IsDir() {
continue
}
if strings.Contains(f.Name(), videoID) {
videoPath := path.Join(y.DownloadDir, f.Name())
err = os.Remove(videoPath)
if err != nil {
log.Errorf("Error while deleting file %s: %v", y.DownloadDir, err)
return err
}
}
}
return nil
}
func deleteFile(path string) error {
_, err := os.Stat(path)
if err != nil && !os.IsNotExist(err) {
log.Errorf("Error determining if file %s exists: %v", path, err)
return err
} else if err != nil {
log.Debugf("File %s does not exist. Skipping deletion.", path)
return nil
}
return os.Remove(path)
}
func findDownloadedVideo(videoDir, videoID string) (*string, error) {
files, err := ioutil.ReadDir(videoDir)
if err != nil {
return nil, err
}
for _, f := range files {
if f.IsDir() {
continue
}
if path.Ext(f.Name()) == ".mp4" && strings.Contains(f.Name(), videoID) {
videoPath := path.Join(videoDir, f.Name())
return &videoPath, nil
}
}
return nil, nil
}
func downloadVideoMetadata(basePath, videoID string) error {
ytdlArgs := []string{
"--skip-download",
"--write-info-json",
"--force-overwrites",
fmt.Sprintf("https://www.youtube.com/watch?v=%s", videoID),
"--cookies",
"cookies.txt",
"-o",
basePath,
}
ytdlCmd := exec.Command("yt-dlp", ytdlArgs...)
output, err := runCmd(ytdlCmd)
log.Debug(output)
return err
}
func downloadVideo(basePath, metadataPath string) error {
ytdlArgs := []string{
"--no-progress",
"-o",
basePath,
"--merge-output-format",
"mp4",
"--postprocessor-args",
"ffmpeg:-movflags faststart",
"--abort-on-unavailable-fragment",
"--fragment-retries",
"1",
"--cookies",
"cookies.txt",
"--extractor-args",
"youtube:player_client=android",
"--load-info-json",
metadataPath,
"-fbestvideo[ext=mp4][vcodec!*=av01][height<=720]+bestaudio[ext!=webm][format_id!=258][format_id!=251][format_id!=256][format_id!=327]",
}
ytdlCmd := exec.Command("yt-dlp", ytdlArgs...)
output, err := runCmd(ytdlCmd)
log.Debug(output)
return err
}
func runCmd(cmd *exec.Cmd) ([]string, error) {
log.Infof("running cmd: %s", strings.Join(cmd.Args, " "))
var err error
stderr, err := cmd.StderrPipe()
if err != nil {
return nil, err
}
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
err = cmd.Start()
if err != nil {
return nil, err
}
outLog, err := ioutil.ReadAll(stdout)
if err != nil {
return nil, err
}
errorLog, err := ioutil.ReadAll(stderr)
if err != nil {
return nil, err
}
done := make(chan error, 1)
go func() {
done <- cmd.Wait()
}()
select {
case err := <-done:
if err != nil {
log.Error(string(errorLog))
return nil, err
}
return strings.Split(strings.Replace(string(outLog), "\r\n", "\n", -1), "\n"), nil
}
}

76
local/ytdlVideoSource.go Normal file
View file

@ -0,0 +1,76 @@
package local
import (
log "github.com/sirupsen/logrus"
"github.com/lbryio/ytsync/v5/downloader/ytdl"
)
type YtdlVideoSource struct {
downloader Ytdl
enrichers []YouTubeVideoEnricher
}
func NewYtdlVideoSource(downloadDir string, config *YouTubeSourceConfig) (*YtdlVideoSource, error) {
ytdl, err := NewYtdl(downloadDir)
if err != nil {
return nil, err
}
source := YtdlVideoSource {
downloader: *ytdl,
}
if config.YouTubeAPIKey != "" {
ytapiEnricher := NewYouTubeAPIVideoEnricher(config.YouTubeAPIKey)
source.enrichers = append(source.enrichers, ytapiEnricher)
}
return &source, nil
}
func (s *YtdlVideoSource) GetVideo(id string) (*SourceVideo, error) {
metadata, err := s.downloader.GetVideoMetadata(id)
if err != nil {
return nil, err
}
videoPath, err := s.downloader.GetVideoFile(id)
if err != nil {
return nil, err
}
var bestThumbnail *ytdl.Thumbnail = nil
for i, thumbnail := range metadata.Thumbnails {
if i == 0 || bestThumbnail.Width < thumbnail.Width {
bestThumbnail = &thumbnail
}
}
sourceVideo := SourceVideo {
ID: id,
Title: &metadata.Title,
Description: &metadata.Description,
SourceURL: "\nhttps://www.youtube.com/watch?v=" + id,
Languages: []string{},
Tags: metadata.Tags,
ReleaseTime: nil,
ThumbnailURL: &bestThumbnail.URL,
FullLocalPath: videoPath,
}
for _, enricher := range s.enrichers {
err = enricher.EnrichMissing(&sourceVideo)
if err != nil {
log.Warnf("Error enriching video %s, continuing enrichment: %v", id, err)
}
}
log.Debugf("Source video retrieved via ytdl: %v", sourceVideo)
return &sourceVideo, nil
}
func (s *YtdlVideoSource) DeleteLocalCache(id string) error {
return s.downloader.DeleteVideoFiles(id)
}

158
main.go
View file

@ -1,172 +1,24 @@
package main
import (
"fmt"
"math/rand"
"net/http"
"os"
"time"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/util"
"github.com/lbryio/ytsync/v5/manager"
"github.com/lbryio/ytsync/v5/sdk"
"github.com/lbryio/ytsync/v5/shared"
ytUtils "github.com/lbryio/ytsync/v5/util"
"github.com/lbryio/ytsync/v5/cmd"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
var Version string
const defaultMaxTries = 3
var (
cliFlags shared.SyncFlags
maxVideoLength int
)
func main() {
rand.Seed(time.Now().UnixNano())
log.SetLevel(log.DebugLevel)
http.Handle("/metrics", promhttp.Handler())
go func() {
http.Handle("/metrics", promhttp.Handler())
log.Error(http.ListenAndServe(":2112", nil))
}()
cmd := &cobra.Command{
Use: "ytsync",
Short: "Publish youtube channels into LBRY network automatically.",
Run: ytSync,
Args: cobra.RangeArgs(0, 0),
}
cmd.Flags().IntVar(&cliFlags.MaxTries, "max-tries", defaultMaxTries, "Number of times to try a publish that fails")
cmd.Flags().BoolVar(&cliFlags.TakeOverExistingChannel, "takeover-existing-channel", false, "If channel exists and we don't own it, take over the channel")
cmd.Flags().IntVar(&cliFlags.Limit, "limit", 0, "limit the amount of channels to sync")
cmd.Flags().BoolVar(&cliFlags.SkipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup")
cmd.Flags().BoolVar(&cliFlags.SyncUpdate, "update", false, "Update previously synced channels instead of syncing new ones")
cmd.Flags().BoolVar(&cliFlags.SingleRun, "run-once", false, "Whether the process should be stopped after one cycle or not")
cmd.Flags().BoolVar(&cliFlags.RemoveDBUnpublished, "remove-db-unpublished", false, "Remove videos from the database that are marked as published but aren't really published")
cmd.Flags().BoolVar(&cliFlags.UpgradeMetadata, "upgrade-metadata", false, "Upgrade videos if they're on the old metadata version")
cmd.Flags().BoolVar(&cliFlags.DisableTransfers, "no-transfers", false, "Skips the transferring process of videos, channels and supports")
cmd.Flags().BoolVar(&cliFlags.QuickSync, "quick", false, "Look up only the last 50 videos from youtube")
cmd.Flags().StringVar(&cliFlags.Status, "status", "", "Specify which queue to pull from. Overrides --update")
cmd.Flags().StringVar(&cliFlags.SecondaryStatus, "status2", "", "Specify which secondary queue to pull from.")
cmd.Flags().StringVar(&cliFlags.ChannelID, "channelID", "", "If specified, only this channel will be synced.")
cmd.Flags().Int64Var(&cliFlags.SyncFrom, "after", time.Unix(0, 0).Unix(), "Specify from when to pull jobs [Unix time](Default: 0)")
cmd.Flags().Int64Var(&cliFlags.SyncUntil, "before", time.Now().AddDate(1, 0, 0).Unix(), "Specify until when to pull jobs [Unix time](Default: current Unix time)")
cmd.Flags().IntVar(&cliFlags.ConcurrentJobs, "concurrent-jobs", 1, "how many jobs to process concurrently")
cmd.Flags().IntVar(&cliFlags.VideosLimit, "videos-limit", 0, "how many videos to process per channel (leave 0 for automatic detection)")
cmd.Flags().IntVar(&cliFlags.MaxVideoSize, "max-size", 2048, "Maximum video size to process (in MB)")
cmd.Flags().IntVar(&maxVideoLength, "max-length", 2, "Maximum video length to process (in hours)")
if err := cmd.Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
}
}
func ytSync(cmd *cobra.Command, args []string) {
var hostname string
slackToken := os.Getenv("SLACK_TOKEN")
if slackToken == "" {
log.Error("A slack token was not present in env vars! Slack messages disabled!")
} else {
var err error
hostname, err = os.Hostname()
if err != nil {
log.Error("could not detect system hostname")
hostname = "ytsync-unknown"
}
if len(hostname) > 30 {
hostname = hostname[0:30]
}
util.InitSlack(os.Getenv("SLACK_TOKEN"), os.Getenv("SLACK_CHANNEL"), hostname)
}
if cliFlags.Status != "" && !util.InSlice(cliFlags.Status, shared.SyncStatuses) {
log.Errorf("status must be one of the following: %v\n", shared.SyncStatuses)
return
}
if cliFlags.MaxTries < 1 {
log.Errorln("setting --max-tries less than 1 doesn't make sense")
return
}
if cliFlags.Limit < 0 {
log.Errorln("setting --limit less than 0 (unlimited) doesn't make sense")
return
}
cliFlags.MaxVideoLength = time.Duration(maxVideoLength) * time.Hour
apiURL := os.Getenv("LBRY_WEB_API")
apiToken := os.Getenv("LBRY_API_TOKEN")
youtubeAPIKey := os.Getenv("YOUTUBE_API_KEY")
lbrycrdDsn := os.Getenv("LBRYCRD_STRING")
awsS3ID := os.Getenv("AWS_S3_ID")
awsS3Secret := os.Getenv("AWS_S3_SECRET")
awsS3Region := os.Getenv("AWS_S3_REGION")
awsS3Bucket := os.Getenv("AWS_S3_BUCKET")
if apiURL == "" {
log.Errorln("An API URL was not defined. Please set the environment variable LBRY_WEB_API")
return
}
if apiToken == "" {
log.Errorln("An API Token was not defined. Please set the environment variable LBRY_API_TOKEN")
return
}
if youtubeAPIKey == "" {
log.Errorln("A Youtube API key was not defined. Please set the environment variable YOUTUBE_API_KEY")
return
}
if awsS3ID == "" {
log.Errorln("AWS S3 ID credentials were not defined. Please set the environment variable AWS_S3_ID")
return
}
if awsS3Secret == "" {
log.Errorln("AWS S3 Secret credentials were not defined. Please set the environment variable AWS_S3_SECRET")
return
}
if awsS3Region == "" {
log.Errorln("AWS S3 Region was not defined. Please set the environment variable AWS_S3_REGION")
return
}
if awsS3Bucket == "" {
log.Errorln("AWS S3 Bucket was not defined. Please set the environment variable AWS_S3_BUCKET")
return
}
if lbrycrdDsn == "" {
log.Infoln("Using default (local) lbrycrd instance. Set LBRYCRD_STRING if you want to use something else")
}
blobsDir := ytUtils.GetBlobsDir()
apiConfig := &sdk.APIConfig{
YoutubeAPIKey: youtubeAPIKey,
ApiURL: apiURL,
ApiToken: apiToken,
HostName: hostname,
}
awsConfig := &shared.AwsConfigs{
AwsS3ID: awsS3ID,
AwsS3Secret: awsS3Secret,
AwsS3Region: awsS3Region,
AwsS3Bucket: awsS3Bucket,
}
sm := manager.NewSyncManager(
cliFlags,
blobsDir,
lbrycrdDsn,
awsConfig,
apiConfig,
)
err := sm.Start()
if err != nil {
ytUtils.SendErrorToSlack(errors.FullTrace(err))
}
ytUtils.SendInfoToSlack("Syncing process terminated!")
cmd.Execute()
}

View file

@ -58,9 +58,12 @@ func (s *SyncManager) Start() error {
}
}
var lastChannelProcessed string
var secondLastChannelProcessed string
syncCount := 0
var (
lastChannelProcessed string
secondLastChannelProcessed string
syncCount int
)
for {
s.channelsToSync = make([]Sync, 0, 10) // reset sync queue
err := s.checkUsedSpace()
@ -108,10 +111,12 @@ func (s *SyncManager) Start() error {
log.Infof("Drained the \"%s\" queue", q)
}
}
if len(s.channelsToSync) == 0 {
log.Infoln("No channels to sync. Pausing 5 minutes!")
time.Sleep(5 * time.Minute)
}
for _, sync := range s.channelsToSync {
if lastChannelProcessed == sync.DbChannelData.ChannelId && secondLastChannelProcessed == lastChannelProcessed {
util.SendToSlack("We just killed a sync for %s to stop looping! (%s)", sync.DbChannelData.DesiredChannelName, sync.DbChannelData.ChannelId)
@ -174,6 +179,7 @@ func (s *SyncManager) Start() error {
break
}
}
return nil
}