Compare commits

...

1 commit

Author SHA1 Message Date
Alex Grintsvayg
e554bbfe18
started work on local ytsync 2021-10-25 10:45:39 -04:00
7 changed files with 262 additions and 267 deletions

View file

@ -1,111 +0,0 @@
From 30380338ba9af01696c94b61f0597131638eaec1 Mon Sep 17 00:00:00 2001
From: Niko Storni <niko@lbry.io>
Date: Mon, 16 Dec 2019 00:13:36 +0100
Subject: [PATCH] lbry-patch
---
youtube_dl/extractor/youtube.py | 45 +++++++++++++++++++++++++--------
1 file changed, 35 insertions(+), 10 deletions(-)
diff --git a/youtube_dl/extractor/youtube.py b/youtube_dl/extractor/youtube.py
index b913d07a6..cd66a5b01 100644
--- a/youtube_dl/extractor/youtube.py
+++ b/youtube_dl/extractor/youtube.py
@@ -10,6 +10,7 @@ import random
import re
import time
import traceback
+import subprocess
from .common import InfoExtractor, SearchInfoExtractor
from ..jsinterp import JSInterpreter
@@ -536,6 +537,9 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
_GEO_BYPASS = False
+ _WGET_429_RATE_LIMIT = 8191
+ _WGET_BINARY = "wget"
+
IE_NAME = 'youtube'
_TESTS = [
{
@@ -1254,6 +1258,17 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
""" Return a string representation of a signature """
return '.'.join(compat_str(len(part)) for part in example_sig.split('.'))
+ def _rate_limit_download(self, url, video_id, note=None):
+ if note is None:
+ self.report_download_webpage(video_id)
+ elif note is not False:
+ if video_id is None:
+ self.to_screen('%s' % (note,))
+ else:
+ self.to_screen('%s: %s' % (video_id, note))
+ source_address = self._downloader.params.get('source_address')
+ return subprocess.run([self._WGET_BINARY, '-q', '--limit-rate', str(self._WGET_429_RATE_LIMIT), '--bind-address', source_address, '-O', '-', url], check=True, stdout=subprocess.PIPE).stdout.decode(encoding='UTF-8')
+
def _extract_signature_function(self, video_id, player_url, example_sig):
id_m = re.match(
r'.*?-(?P<id>[a-zA-Z0-9_-]+)(?:/watch_as3|/html5player(?:-new)?|(?:/[a-z]{2,3}_[A-Z]{2})?/base)?\.(?P<ext>[a-z]+)$',
@@ -1678,7 +1693,7 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
# Get video webpage
url = proto + '://www.youtube.com/watch?v=%s&gl=US&hl=en&has_verified=1&bpctr=9999999999' % video_id
- video_webpage = self._download_webpage(url, video_id)
+ video_webpage = self._rate_limit_download(url, video_id)
# Attempt to extract SWF player URL
mobj = re.search(r'swfConfig.*?"(https?:\\/\\/.*?watch.*?-.*?\.swf)"', video_webpage)
@@ -1736,10 +1751,9 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
r'"sts"\s*:\s*(\d+)', embed_webpage, 'sts', default=''),
})
video_info_url = proto + '://www.youtube.com/get_video_info?' + data
- video_info_webpage = self._download_webpage(
+ video_info_webpage = self._rate_limit_download(
video_info_url, video_id,
- note='Refetching age-gated info webpage',
- errnote='unable to download video info webpage')
+ note='Refetching age-gated info webpage')
video_info = compat_parse_qs(video_info_webpage)
pl_response = video_info.get('player_response', [None])[0]
player_response = extract_player_response(pl_response, video_id)
@@ -1777,7 +1791,7 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
# The general idea is to take a union of itags of both DASH manifests (for example
# video with such 'manifest behavior' see https://github.com/ytdl-org/youtube-dl/issues/6093)
self.report_video_info_webpage_download(video_id)
- for el in ('embedded', 'detailpage', 'vevo', ''):
+ for el in ('', 'embedded', 'detailpage', 'vevo'):
query = {
'video_id': video_id,
'ps': 'default',
@@ -1789,11 +1803,22 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
query['el'] = el
if sts:
query['sts'] = sts
- video_info_webpage = self._download_webpage(
- '%s://www.youtube.com/get_video_info' % proto,
- video_id, note=False,
- errnote='unable to download video info webpage',
- fatal=False, query=query)
+
+ if el == '':
+ base_url = 'https://youtube.com/get_video_info?video_id={}'.format(video_id)
+ else:
+ base_url = 'https://youtube.com/get_video_info'
+
+ for q in query:
+ if q is None or q is "":
+ continue
+ if query[q] is None or query[q] is "":
+ continue
+
+ base_url = base_url + "?{}={}".format(q, query[q])
+
+ video_info_webpage = self._rate_limit_download(base_url, video_id)
+
if not video_info_webpage:
continue
get_video_info = compat_parse_qs(video_info_webpage)
--
2.17.1

7
cmd/local.go Normal file
View file

@ -0,0 +1,7 @@
package cmd
import "github.com/lbryio/ytsync/v5/local"
func init() {
local.AddCommand(rootCmd)
}

166
cmd/root.go Normal file
View file

@ -0,0 +1,166 @@
package cmd
import (
"os"
"time"
"github.com/lbryio/ytsync/v5/manager"
"github.com/lbryio/ytsync/v5/sdk"
"github.com/lbryio/ytsync/v5/shared"
ytUtils "github.com/lbryio/ytsync/v5/util"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/util"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
const defaultMaxTries = 3
var (
cliFlags shared.SyncFlags
maxVideoLength int
)
var rootCmd = &cobra.Command{
Use: "ytsync",
Short: "Publish youtube channels into LBRY network automatically.",
Run: ytSync,
Args: cobra.RangeArgs(0, 0),
}
func init() {
rootCmd.Flags().IntVar(&cliFlags.MaxTries, "max-tries", defaultMaxTries, "Number of times to try a publish that fails")
rootCmd.Flags().BoolVar(&cliFlags.TakeOverExistingChannel, "takeover-existing-channel", false, "If channel exists and we don't own it, take over the channel")
rootCmd.Flags().IntVar(&cliFlags.Limit, "limit", 0, "limit the amount of channels to sync")
rootCmd.Flags().BoolVar(&cliFlags.SkipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup")
rootCmd.Flags().BoolVar(&cliFlags.SyncUpdate, "update", false, "Update previously synced channels instead of syncing new ones")
rootCmd.Flags().BoolVar(&cliFlags.SingleRun, "run-once", false, "Whether the process should be stopped after one cycle or not")
rootCmd.Flags().BoolVar(&cliFlags.RemoveDBUnpublished, "remove-db-unpublished", false, "Remove videos from the database that are marked as published but aren't really published")
rootCmd.Flags().BoolVar(&cliFlags.UpgradeMetadata, "upgrade-metadata", false, "Upgrade videos if they're on the old metadata version")
rootCmd.Flags().BoolVar(&cliFlags.DisableTransfers, "no-transfers", false, "Skips the transferring process of videos, channels and supports")
rootCmd.Flags().BoolVar(&cliFlags.QuickSync, "quick", false, "Look up only the last 50 videos from youtube")
rootCmd.Flags().StringVar(&cliFlags.Status, "status", "", "Specify which queue to pull from. Overrides --update")
rootCmd.Flags().StringVar(&cliFlags.SecondaryStatus, "status2", "", "Specify which secondary queue to pull from.")
rootCmd.Flags().StringVar(&cliFlags.ChannelID, "channelID", "", "If specified, only this channel will be synced.")
rootCmd.Flags().Int64Var(&cliFlags.SyncFrom, "after", time.Unix(0, 0).Unix(), "Specify from when to pull jobs [Unix time](Default: 0)")
rootCmd.Flags().Int64Var(&cliFlags.SyncUntil, "before", time.Now().AddDate(1, 0, 0).Unix(), "Specify until when to pull jobs [Unix time](Default: current Unix time)")
rootCmd.Flags().IntVar(&cliFlags.ConcurrentJobs, "concurrent-jobs", 1, "how many jobs to process concurrently")
rootCmd.Flags().IntVar(&cliFlags.VideosLimit, "videos-limit", 0, "how many videos to process per channel (leave 0 for automatic detection)")
rootCmd.Flags().IntVar(&cliFlags.MaxVideoSize, "max-size", 2048, "Maximum video size to process (in MB)")
rootCmd.Flags().IntVar(&maxVideoLength, "max-length", 2, "Maximum video length to process (in hours)")
}
func Execute() {
err := rootCmd.Execute()
if err != nil {
log.Errorln(err)
os.Exit(1)
}
}
func ytSync(cmd *cobra.Command, args []string) {
var hostname string
slackToken := os.Getenv("SLACK_TOKEN")
if slackToken == "" {
log.Error("A slack token was not present in env vars! Slack messages disabled!")
} else {
var err error
hostname, err = os.Hostname()
if err != nil {
log.Error("could not detect system hostname")
hostname = "ytsync-unknown"
}
if len(hostname) > 30 {
hostname = hostname[0:30]
}
util.InitSlack(os.Getenv("SLACK_TOKEN"), os.Getenv("SLACK_CHANNEL"), hostname)
}
if cliFlags.Status != "" && !util.InSlice(cliFlags.Status, shared.SyncStatuses) {
log.Errorf("status must be one of the following: %v\n", shared.SyncStatuses)
return
}
if cliFlags.MaxTries < 1 {
log.Errorln("setting --max-tries less than 1 doesn't make sense")
return
}
if cliFlags.Limit < 0 {
log.Errorln("setting --limit less than 0 (unlimited) doesn't make sense")
return
}
cliFlags.MaxVideoLength = time.Duration(maxVideoLength) * time.Hour
apiURL := os.Getenv("LBRY_WEB_API")
apiToken := os.Getenv("LBRY_API_TOKEN")
youtubeAPIKey := os.Getenv("YOUTUBE_API_KEY")
lbrycrdDsn := os.Getenv("LBRYCRD_STRING")
awsS3ID := os.Getenv("AWS_S3_ID")
awsS3Secret := os.Getenv("AWS_S3_SECRET")
awsS3Region := os.Getenv("AWS_S3_REGION")
awsS3Bucket := os.Getenv("AWS_S3_BUCKET")
if apiURL == "" {
log.Errorln("An API URL was not defined. Please set the environment variable LBRY_WEB_API")
return
}
if apiToken == "" {
log.Errorln("An API Token was not defined. Please set the environment variable LBRY_API_TOKEN")
return
}
if youtubeAPIKey == "" {
log.Errorln("A Youtube API key was not defined. Please set the environment variable YOUTUBE_API_KEY")
return
}
if awsS3ID == "" {
log.Errorln("AWS S3 ID credentials were not defined. Please set the environment variable AWS_S3_ID")
return
}
if awsS3Secret == "" {
log.Errorln("AWS S3 Secret credentials were not defined. Please set the environment variable AWS_S3_SECRET")
return
}
if awsS3Region == "" {
log.Errorln("AWS S3 Region was not defined. Please set the environment variable AWS_S3_REGION")
return
}
if awsS3Bucket == "" {
log.Errorln("AWS S3 Bucket was not defined. Please set the environment variable AWS_S3_BUCKET")
return
}
if lbrycrdDsn == "" {
log.Infoln("Using default (local) lbrycrd instance. Set LBRYCRD_STRING if you want to use something else")
}
blobsDir := ytUtils.GetBlobsDir()
apiConfig := &sdk.APIConfig{
YoutubeAPIKey: youtubeAPIKey,
ApiURL: apiURL,
ApiToken: apiToken,
HostName: hostname,
}
awsConfig := &shared.AwsConfigs{
AwsS3ID: awsS3ID,
AwsS3Secret: awsS3Secret,
AwsS3Region: awsS3Region,
AwsS3Bucket: awsS3Bucket,
}
sm := manager.NewSyncManager(
cliFlags,
blobsDir,
lbrycrdDsn,
awsConfig,
apiConfig,
)
err := sm.Start()
if err != nil {
ytUtils.SendErrorToSlack(errors.FullTrace(err))
}
ytUtils.SendInfoToSlack("Syncing process terminated!")
}

22
local/local.go Normal file
View file

@ -0,0 +1,22 @@
package local
import (
"fmt"
"github.com/spf13/cobra"
)
func AddCommand(rootCmd *cobra.Command) {
cmd := &cobra.Command{
Use: "local",
Short: "run a personal ytsync",
Run: localCmd,
}
//cmd.Flags().StringVar(&cache, "cache", "", "path to cache")
rootCmd.AddCommand(cmd)
}
func localCmd(cmd *cobra.Command, args []string) {
fmt.Println("local")
}

53
local/readme.md Normal file
View file

@ -0,0 +1,53 @@
# Running ytsync locally
## Requirements
- LBRY SDK (what do we actually need this for?)
- youtube-dl
- enough space to cache stuff
## Process
### Ensuring requirements are met
- claim channel if there isn't one yet
- or easier, just error if no channel
- enough lbc in wallet?
### Options to figure out what's already synced
- simplest: assume nothing is synced yet
- assume everything before some video is synced
- get/put sync info from Odysee by proving you have private key for channel
- tag videos as having been synced from youtube so we can ensure accuracy
- hardest: scan channel and try to match up which videos are not synced yet
### Central DB
- prove you have a channel's private key to get info about that channel
- proper queue instead of sleeping for N minutes between syncs
### Syncing a single video
- downloading it
- thumbnails
- metadata
- having enough LBC for publish(es)
- automated error handling
- getting a human involved for errors that can't be handled automatically
- reflecting
### Continuous Sync
- running in background
- storing local state
- interactions with our central ytsync db
- dealing with yt throttling
### Debugging
- dry-running the whole thing

158
main.go
View file

@ -1,172 +1,24 @@
package main package main
import ( import (
"fmt"
"math/rand" "math/rand"
"net/http" "net/http"
"os"
"time" "time"
"github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/ytsync/v5/cmd"
"github.com/lbryio/lbry.go/v2/extras/util"
"github.com/lbryio/ytsync/v5/manager"
"github.com/lbryio/ytsync/v5/sdk"
"github.com/lbryio/ytsync/v5/shared"
ytUtils "github.com/lbryio/ytsync/v5/util"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
var Version string
const defaultMaxTries = 3
var (
cliFlags shared.SyncFlags
maxVideoLength int
) )
func main() { func main() {
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
log.SetLevel(log.DebugLevel) log.SetLevel(log.DebugLevel)
http.Handle("/metrics", promhttp.Handler())
go func() { go func() {
http.Handle("/metrics", promhttp.Handler())
log.Error(http.ListenAndServe(":2112", nil)) log.Error(http.ListenAndServe(":2112", nil))
}() }()
cmd := &cobra.Command{
Use: "ytsync",
Short: "Publish youtube channels into LBRY network automatically.",
Run: ytSync,
Args: cobra.RangeArgs(0, 0),
}
cmd.Flags().IntVar(&cliFlags.MaxTries, "max-tries", defaultMaxTries, "Number of times to try a publish that fails") cmd.Execute()
cmd.Flags().BoolVar(&cliFlags.TakeOverExistingChannel, "takeover-existing-channel", false, "If channel exists and we don't own it, take over the channel")
cmd.Flags().IntVar(&cliFlags.Limit, "limit", 0, "limit the amount of channels to sync")
cmd.Flags().BoolVar(&cliFlags.SkipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup")
cmd.Flags().BoolVar(&cliFlags.SyncUpdate, "update", false, "Update previously synced channels instead of syncing new ones")
cmd.Flags().BoolVar(&cliFlags.SingleRun, "run-once", false, "Whether the process should be stopped after one cycle or not")
cmd.Flags().BoolVar(&cliFlags.RemoveDBUnpublished, "remove-db-unpublished", false, "Remove videos from the database that are marked as published but aren't really published")
cmd.Flags().BoolVar(&cliFlags.UpgradeMetadata, "upgrade-metadata", false, "Upgrade videos if they're on the old metadata version")
cmd.Flags().BoolVar(&cliFlags.DisableTransfers, "no-transfers", false, "Skips the transferring process of videos, channels and supports")
cmd.Flags().BoolVar(&cliFlags.QuickSync, "quick", false, "Look up only the last 50 videos from youtube")
cmd.Flags().StringVar(&cliFlags.Status, "status", "", "Specify which queue to pull from. Overrides --update")
cmd.Flags().StringVar(&cliFlags.SecondaryStatus, "status2", "", "Specify which secondary queue to pull from.")
cmd.Flags().StringVar(&cliFlags.ChannelID, "channelID", "", "If specified, only this channel will be synced.")
cmd.Flags().Int64Var(&cliFlags.SyncFrom, "after", time.Unix(0, 0).Unix(), "Specify from when to pull jobs [Unix time](Default: 0)")
cmd.Flags().Int64Var(&cliFlags.SyncUntil, "before", time.Now().AddDate(1, 0, 0).Unix(), "Specify until when to pull jobs [Unix time](Default: current Unix time)")
cmd.Flags().IntVar(&cliFlags.ConcurrentJobs, "concurrent-jobs", 1, "how many jobs to process concurrently")
cmd.Flags().IntVar(&cliFlags.VideosLimit, "videos-limit", 0, "how many videos to process per channel (leave 0 for automatic detection)")
cmd.Flags().IntVar(&cliFlags.MaxVideoSize, "max-size", 2048, "Maximum video size to process (in MB)")
cmd.Flags().IntVar(&maxVideoLength, "max-length", 2, "Maximum video length to process (in hours)")
if err := cmd.Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
}
}
func ytSync(cmd *cobra.Command, args []string) {
var hostname string
slackToken := os.Getenv("SLACK_TOKEN")
if slackToken == "" {
log.Error("A slack token was not present in env vars! Slack messages disabled!")
} else {
var err error
hostname, err = os.Hostname()
if err != nil {
log.Error("could not detect system hostname")
hostname = "ytsync-unknown"
}
if len(hostname) > 30 {
hostname = hostname[0:30]
}
util.InitSlack(os.Getenv("SLACK_TOKEN"), os.Getenv("SLACK_CHANNEL"), hostname)
}
if cliFlags.Status != "" && !util.InSlice(cliFlags.Status, shared.SyncStatuses) {
log.Errorf("status must be one of the following: %v\n", shared.SyncStatuses)
return
}
if cliFlags.MaxTries < 1 {
log.Errorln("setting --max-tries less than 1 doesn't make sense")
return
}
if cliFlags.Limit < 0 {
log.Errorln("setting --limit less than 0 (unlimited) doesn't make sense")
return
}
cliFlags.MaxVideoLength = time.Duration(maxVideoLength) * time.Hour
apiURL := os.Getenv("LBRY_WEB_API")
apiToken := os.Getenv("LBRY_API_TOKEN")
youtubeAPIKey := os.Getenv("YOUTUBE_API_KEY")
lbrycrdDsn := os.Getenv("LBRYCRD_STRING")
awsS3ID := os.Getenv("AWS_S3_ID")
awsS3Secret := os.Getenv("AWS_S3_SECRET")
awsS3Region := os.Getenv("AWS_S3_REGION")
awsS3Bucket := os.Getenv("AWS_S3_BUCKET")
if apiURL == "" {
log.Errorln("An API URL was not defined. Please set the environment variable LBRY_WEB_API")
return
}
if apiToken == "" {
log.Errorln("An API Token was not defined. Please set the environment variable LBRY_API_TOKEN")
return
}
if youtubeAPIKey == "" {
log.Errorln("A Youtube API key was not defined. Please set the environment variable YOUTUBE_API_KEY")
return
}
if awsS3ID == "" {
log.Errorln("AWS S3 ID credentials were not defined. Please set the environment variable AWS_S3_ID")
return
}
if awsS3Secret == "" {
log.Errorln("AWS S3 Secret credentials were not defined. Please set the environment variable AWS_S3_SECRET")
return
}
if awsS3Region == "" {
log.Errorln("AWS S3 Region was not defined. Please set the environment variable AWS_S3_REGION")
return
}
if awsS3Bucket == "" {
log.Errorln("AWS S3 Bucket was not defined. Please set the environment variable AWS_S3_BUCKET")
return
}
if lbrycrdDsn == "" {
log.Infoln("Using default (local) lbrycrd instance. Set LBRYCRD_STRING if you want to use something else")
}
blobsDir := ytUtils.GetBlobsDir()
apiConfig := &sdk.APIConfig{
YoutubeAPIKey: youtubeAPIKey,
ApiURL: apiURL,
ApiToken: apiToken,
HostName: hostname,
}
awsConfig := &shared.AwsConfigs{
AwsS3ID: awsS3ID,
AwsS3Secret: awsS3Secret,
AwsS3Region: awsS3Region,
AwsS3Bucket: awsS3Bucket,
}
sm := manager.NewSyncManager(
cliFlags,
blobsDir,
lbrycrdDsn,
awsConfig,
apiConfig,
)
err := sm.Start()
if err != nil {
ytUtils.SendErrorToSlack(errors.FullTrace(err))
}
ytUtils.SendInfoToSlack("Syncing process terminated!")
} }

View file

@ -58,9 +58,12 @@ func (s *SyncManager) Start() error {
} }
} }
var lastChannelProcessed string var (
var secondLastChannelProcessed string lastChannelProcessed string
syncCount := 0 secondLastChannelProcessed string
syncCount int
)
for { for {
s.channelsToSync = make([]Sync, 0, 10) // reset sync queue s.channelsToSync = make([]Sync, 0, 10) // reset sync queue
err := s.checkUsedSpace() err := s.checkUsedSpace()
@ -108,10 +111,12 @@ func (s *SyncManager) Start() error {
log.Infof("Drained the \"%s\" queue", q) log.Infof("Drained the \"%s\" queue", q)
} }
} }
if len(s.channelsToSync) == 0 { if len(s.channelsToSync) == 0 {
log.Infoln("No channels to sync. Pausing 5 minutes!") log.Infoln("No channels to sync. Pausing 5 minutes!")
time.Sleep(5 * time.Minute) time.Sleep(5 * time.Minute)
} }
for _, sync := range s.channelsToSync { for _, sync := range s.channelsToSync {
if lastChannelProcessed == sync.DbChannelData.ChannelId && secondLastChannelProcessed == lastChannelProcessed { if lastChannelProcessed == sync.DbChannelData.ChannelId && secondLastChannelProcessed == lastChannelProcessed {
util.SendToSlack("We just killed a sync for %s to stop looping! (%s)", sync.DbChannelData.DesiredChannelName, sync.DbChannelData.ChannelId) util.SendToSlack("We just killed a sync for %s to stop looping! (%s)", sync.DbChannelData.DesiredChannelName, sync.DbChannelData.ChannelId)
@ -174,6 +179,7 @@ func (s *SyncManager) Start() error {
break break
} }
} }
return nil return nil
} }