Compare commits

...

11 commits

16 changed files with 2206 additions and 267 deletions

View file

@ -1,111 +0,0 @@
From 30380338ba9af01696c94b61f0597131638eaec1 Mon Sep 17 00:00:00 2001
From: Niko Storni <niko@lbry.io>
Date: Mon, 16 Dec 2019 00:13:36 +0100
Subject: [PATCH] lbry-patch
---
youtube_dl/extractor/youtube.py | 45 +++++++++++++++++++++++++--------
1 file changed, 35 insertions(+), 10 deletions(-)
diff --git a/youtube_dl/extractor/youtube.py b/youtube_dl/extractor/youtube.py
index b913d07a6..cd66a5b01 100644
--- a/youtube_dl/extractor/youtube.py
+++ b/youtube_dl/extractor/youtube.py
@@ -10,6 +10,7 @@ import random
import re
import time
import traceback
+import subprocess
from .common import InfoExtractor, SearchInfoExtractor
from ..jsinterp import JSInterpreter
@@ -536,6 +537,9 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
_GEO_BYPASS = False
+ _WGET_429_RATE_LIMIT = 8191
+ _WGET_BINARY = "wget"
+
IE_NAME = 'youtube'
_TESTS = [
{
@@ -1254,6 +1258,17 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
""" Return a string representation of a signature """
return '.'.join(compat_str(len(part)) for part in example_sig.split('.'))
+ def _rate_limit_download(self, url, video_id, note=None):
+ if note is None:
+ self.report_download_webpage(video_id)
+ elif note is not False:
+ if video_id is None:
+ self.to_screen('%s' % (note,))
+ else:
+ self.to_screen('%s: %s' % (video_id, note))
+ source_address = self._downloader.params.get('source_address')
+ return subprocess.run([self._WGET_BINARY, '-q', '--limit-rate', str(self._WGET_429_RATE_LIMIT), '--bind-address', source_address, '-O', '-', url], check=True, stdout=subprocess.PIPE).stdout.decode(encoding='UTF-8')
+
def _extract_signature_function(self, video_id, player_url, example_sig):
id_m = re.match(
r'.*?-(?P<id>[a-zA-Z0-9_-]+)(?:/watch_as3|/html5player(?:-new)?|(?:/[a-z]{2,3}_[A-Z]{2})?/base)?\.(?P<ext>[a-z]+)$',
@@ -1678,7 +1693,7 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
# Get video webpage
url = proto + '://www.youtube.com/watch?v=%s&gl=US&hl=en&has_verified=1&bpctr=9999999999' % video_id
- video_webpage = self._download_webpage(url, video_id)
+ video_webpage = self._rate_limit_download(url, video_id)
# Attempt to extract SWF player URL
mobj = re.search(r'swfConfig.*?"(https?:\\/\\/.*?watch.*?-.*?\.swf)"', video_webpage)
@@ -1736,10 +1751,9 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
r'"sts"\s*:\s*(\d+)', embed_webpage, 'sts', default=''),
})
video_info_url = proto + '://www.youtube.com/get_video_info?' + data
- video_info_webpage = self._download_webpage(
+ video_info_webpage = self._rate_limit_download(
video_info_url, video_id,
- note='Refetching age-gated info webpage',
- errnote='unable to download video info webpage')
+ note='Refetching age-gated info webpage')
video_info = compat_parse_qs(video_info_webpage)
pl_response = video_info.get('player_response', [None])[0]
player_response = extract_player_response(pl_response, video_id)
@@ -1777,7 +1791,7 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
# The general idea is to take a union of itags of both DASH manifests (for example
# video with such 'manifest behavior' see https://github.com/ytdl-org/youtube-dl/issues/6093)
self.report_video_info_webpage_download(video_id)
- for el in ('embedded', 'detailpage', 'vevo', ''):
+ for el in ('', 'embedded', 'detailpage', 'vevo'):
query = {
'video_id': video_id,
'ps': 'default',
@@ -1789,11 +1803,22 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
query['el'] = el
if sts:
query['sts'] = sts
- video_info_webpage = self._download_webpage(
- '%s://www.youtube.com/get_video_info' % proto,
- video_id, note=False,
- errnote='unable to download video info webpage',
- fatal=False, query=query)
+
+ if el == '':
+ base_url = 'https://youtube.com/get_video_info?video_id={}'.format(video_id)
+ else:
+ base_url = 'https://youtube.com/get_video_info'
+
+ for q in query:
+ if q is None or q is "":
+ continue
+ if query[q] is None or query[q] is "":
+ continue
+
+ base_url = base_url + "?{}={}".format(q, query[q])
+
+ video_info_webpage = self._rate_limit_download(base_url, video_id)
+
if not video_info_webpage:
continue
get_video_info = compat_parse_qs(video_info_webpage)
--
2.17.1

7
cmd/local.go Normal file
View file

@ -0,0 +1,7 @@
package cmd
import "github.com/lbryio/ytsync/v5/local"
func init() {
local.AddCommand(rootCmd)
}

166
cmd/root.go Normal file
View file

@ -0,0 +1,166 @@
package cmd
import (
"os"
"time"
"github.com/lbryio/ytsync/v5/manager"
"github.com/lbryio/ytsync/v5/sdk"
"github.com/lbryio/ytsync/v5/shared"
ytUtils "github.com/lbryio/ytsync/v5/util"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/util"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
const defaultMaxTries = 3
var (
cliFlags shared.SyncFlags
maxVideoLength int
)
var rootCmd = &cobra.Command{
Use: "ytsync",
Short: "Publish youtube channels into LBRY network automatically.",
Run: ytSync,
Args: cobra.RangeArgs(0, 0),
}
func init() {
rootCmd.Flags().IntVar(&cliFlags.MaxTries, "max-tries", defaultMaxTries, "Number of times to try a publish that fails")
rootCmd.Flags().BoolVar(&cliFlags.TakeOverExistingChannel, "takeover-existing-channel", false, "If channel exists and we don't own it, take over the channel")
rootCmd.Flags().IntVar(&cliFlags.Limit, "limit", 0, "limit the amount of channels to sync")
rootCmd.Flags().BoolVar(&cliFlags.SkipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup")
rootCmd.Flags().BoolVar(&cliFlags.SyncUpdate, "update", false, "Update previously synced channels instead of syncing new ones")
rootCmd.Flags().BoolVar(&cliFlags.SingleRun, "run-once", false, "Whether the process should be stopped after one cycle or not")
rootCmd.Flags().BoolVar(&cliFlags.RemoveDBUnpublished, "remove-db-unpublished", false, "Remove videos from the database that are marked as published but aren't really published")
rootCmd.Flags().BoolVar(&cliFlags.UpgradeMetadata, "upgrade-metadata", false, "Upgrade videos if they're on the old metadata version")
rootCmd.Flags().BoolVar(&cliFlags.DisableTransfers, "no-transfers", false, "Skips the transferring process of videos, channels and supports")
rootCmd.Flags().BoolVar(&cliFlags.QuickSync, "quick", false, "Look up only the last 50 videos from youtube")
rootCmd.Flags().StringVar(&cliFlags.Status, "status", "", "Specify which queue to pull from. Overrides --update")
rootCmd.Flags().StringVar(&cliFlags.SecondaryStatus, "status2", "", "Specify which secondary queue to pull from.")
rootCmd.Flags().StringVar(&cliFlags.ChannelID, "channelID", "", "If specified, only this channel will be synced.")
rootCmd.Flags().Int64Var(&cliFlags.SyncFrom, "after", time.Unix(0, 0).Unix(), "Specify from when to pull jobs [Unix time](Default: 0)")
rootCmd.Flags().Int64Var(&cliFlags.SyncUntil, "before", time.Now().AddDate(1, 0, 0).Unix(), "Specify until when to pull jobs [Unix time](Default: current Unix time)")
rootCmd.Flags().IntVar(&cliFlags.ConcurrentJobs, "concurrent-jobs", 1, "how many jobs to process concurrently")
rootCmd.Flags().IntVar(&cliFlags.VideosLimit, "videos-limit", 0, "how many videos to process per channel (leave 0 for automatic detection)")
rootCmd.Flags().IntVar(&cliFlags.MaxVideoSize, "max-size", 2048, "Maximum video size to process (in MB)")
rootCmd.Flags().IntVar(&maxVideoLength, "max-length", 2, "Maximum video length to process (in hours)")
}
func Execute() {
err := rootCmd.Execute()
if err != nil {
log.Errorln(err)
os.Exit(1)
}
}
func ytSync(cmd *cobra.Command, args []string) {
var hostname string
slackToken := os.Getenv("SLACK_TOKEN")
if slackToken == "" {
log.Error("A slack token was not present in env vars! Slack messages disabled!")
} else {
var err error
hostname, err = os.Hostname()
if err != nil {
log.Error("could not detect system hostname")
hostname = "ytsync-unknown"
}
if len(hostname) > 30 {
hostname = hostname[0:30]
}
util.InitSlack(os.Getenv("SLACK_TOKEN"), os.Getenv("SLACK_CHANNEL"), hostname)
}
if cliFlags.Status != "" && !util.InSlice(cliFlags.Status, shared.SyncStatuses) {
log.Errorf("status must be one of the following: %v\n", shared.SyncStatuses)
return
}
if cliFlags.MaxTries < 1 {
log.Errorln("setting --max-tries less than 1 doesn't make sense")
return
}
if cliFlags.Limit < 0 {
log.Errorln("setting --limit less than 0 (unlimited) doesn't make sense")
return
}
cliFlags.MaxVideoLength = time.Duration(maxVideoLength) * time.Hour
apiURL := os.Getenv("LBRY_WEB_API")
apiToken := os.Getenv("LBRY_API_TOKEN")
youtubeAPIKey := os.Getenv("YOUTUBE_API_KEY")
lbrycrdDsn := os.Getenv("LBRYCRD_STRING")
awsS3ID := os.Getenv("AWS_S3_ID")
awsS3Secret := os.Getenv("AWS_S3_SECRET")
awsS3Region := os.Getenv("AWS_S3_REGION")
awsS3Bucket := os.Getenv("AWS_S3_BUCKET")
if apiURL == "" {
log.Errorln("An API URL was not defined. Please set the environment variable LBRY_WEB_API")
return
}
if apiToken == "" {
log.Errorln("An API Token was not defined. Please set the environment variable LBRY_API_TOKEN")
return
}
if youtubeAPIKey == "" {
log.Errorln("A Youtube API key was not defined. Please set the environment variable YOUTUBE_API_KEY")
return
}
if awsS3ID == "" {
log.Errorln("AWS S3 ID credentials were not defined. Please set the environment variable AWS_S3_ID")
return
}
if awsS3Secret == "" {
log.Errorln("AWS S3 Secret credentials were not defined. Please set the environment variable AWS_S3_SECRET")
return
}
if awsS3Region == "" {
log.Errorln("AWS S3 Region was not defined. Please set the environment variable AWS_S3_REGION")
return
}
if awsS3Bucket == "" {
log.Errorln("AWS S3 Bucket was not defined. Please set the environment variable AWS_S3_BUCKET")
return
}
if lbrycrdDsn == "" {
log.Infoln("Using default (local) lbrycrd instance. Set LBRYCRD_STRING if you want to use something else")
}
blobsDir := ytUtils.GetBlobsDir()
apiConfig := &sdk.APIConfig{
YoutubeAPIKey: youtubeAPIKey,
ApiURL: apiURL,
ApiToken: apiToken,
HostName: hostname,
}
awsConfig := &shared.AwsConfigs{
AwsS3ID: awsS3ID,
AwsS3Secret: awsS3Secret,
AwsS3Region: awsS3Region,
AwsS3Bucket: awsS3Bucket,
}
sm := manager.NewSyncManager(
cliFlags,
blobsDir,
lbrycrdDsn,
awsConfig,
apiConfig,
)
err := sm.Start()
if err != nil {
ytUtils.SendErrorToSlack(errors.FullTrace(err))
}
ytUtils.SendInfoToSlack("Syncing process terminated!")
}

1
go.mod
View file

@ -22,6 +22,7 @@ require (
github.com/kr/pretty v0.2.1 // indirect
github.com/lbryio/lbry.go/v2 v2.7.2-0.20210824154606-3e18b74da08b
github.com/lbryio/reflector.go v1.1.3-0.20210412225256-4392c9724262
github.com/mattn/go-sqlite3 v1.10.0
github.com/miekg/dns v1.1.22 // indirect
github.com/mitchellh/go-ps v0.0.0-20190716172923-621e5597135b
github.com/opencontainers/go-digest v1.0.0-rc1 // indirect

1
go.sum
View file

@ -318,6 +318,7 @@ github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaO
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU=
github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-sqlite3 v1.10.0 h1:jbhqpg7tQe4SupckyijYiy0mJJ/pRyHvXf7JdWK860o=
github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=

460
local/local.go Normal file
View file

@ -0,0 +1,460 @@
package local
import (
"errors"
"os"
"regexp"
"strings"
"time"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/abadojack/whatlanggo"
"github.com/lbryio/lbry.go/v2/extras/util"
"github.com/lbryio/ytsync/v5/namer"
"github.com/lbryio/ytsync/v5/tags_manager"
)
type SyncContext struct {
DryRun bool
KeepCache bool
ReflectStreams bool
ForceChannelScan bool
TempDir string
SyncDbPath string
LbrynetAddr string
ChannelID string
VideoID string
PublishBid float64
YouTubeSourceConfig *YouTubeSourceConfig
}
func (c *SyncContext) Validate() error {
if c.TempDir == "" {
return errors.New("No TempDir provided")
}
if c.SyncDbPath == "" {
return errors.New("No sync DB path provided")
}
if c.LbrynetAddr == "" {
return errors.New("No Lbrynet address provided")
}
if c.ChannelID == "" {
return errors.New("No channel ID provided")
}
if c.PublishBid <= 0.0 {
return errors.New("Publish bid is not greater than zero")
}
if c.YouTubeSourceConfig.ChannelID != "" {
// Validate for YouTube source
// For now, an API key is required
if c.YouTubeSourceConfig.APIKey == "" {
return errors.New("YouTube source was selected, but no YouTube API key was provided.")
}
} else {
return errors.New("No video source provided")
}
return nil
}
type YouTubeSourceConfig struct {
ChannelID string
APIKey string
}
var syncContext SyncContext
func AddCommand(rootCmd *cobra.Command) {
cmd := &cobra.Command{
Use: "local",
Short: "run a personal ytsync",
Run: localCmd,
Args: cobra.ExactArgs(0),
}
cmd.Flags().BoolVar(&syncContext.DryRun, "dry-run", false, "Display information about the stream publishing, but do not publish the stream")
cmd.Flags().BoolVar(&syncContext.KeepCache, "keep-cache", false, "Don't delete local files after publishing.")
cmd.Flags().BoolVar(&syncContext.ReflectStreams, "reflect-streams", true, "Require published streams to be reflected.")
cmd.Flags().BoolVar(&syncContext.ForceChannelScan, "force-rescan", false, "Rescan channel to fill the sync DB.")
cmd.Flags().StringVar(&syncContext.TempDir, "temp-dir", getEnvDefault("TEMP_DIR", ""), "directory to use for temporary files")
cmd.Flags().StringVar(&syncContext.SyncDbPath, "sync-db-path", getEnvDefault("SYNC_DB_PATH", ""), "Path to the local sync DB")
cmd.Flags().Float64Var(&syncContext.PublishBid, "publish-bid", 0.01, "Bid amount for the stream claim")
cmd.Flags().StringVar(&syncContext.LbrynetAddr, "lbrynet-address", getEnvDefault("LBRYNET_ADDRESS", ""), "JSONRPC address of the local LBRYNet daemon")
cmd.Flags().StringVar(&syncContext.ChannelID, "channel-id", "", "LBRY channel ID to publish to")
cmd.Flags().StringVar(&syncContext.VideoID, "video-id", "", "ID of video to sync. This will attempt to sync only this one video.")
syncContext.YouTubeSourceConfig = &YouTubeSourceConfig{}
cmd.Flags().StringVar(&syncContext.YouTubeSourceConfig.APIKey, "youtube-api-key", getEnvDefault("YOUTUBE_API_KEY", ""), "YouTube API Key")
cmd.Flags().StringVar(&syncContext.YouTubeSourceConfig.ChannelID, "youtube-channel", "", "YouTube Channel ID")
rootCmd.AddCommand(cmd)
}
func getEnvDefault(key, defaultValue string) string {
if value, ok := os.LookupEnv(key); ok {
return value
}
return defaultValue
}
func localCmd(cmd *cobra.Command, args []string) {
err := syncContext.Validate()
if err != nil {
log.Error(err)
return
}
syncDB, err := NewSyncDb(syncContext.SyncDbPath)
if err != nil {
log.Errorf("Error creating sync DB: %v", err)
return
}
defer syncDB.Close()
var publisher VideoPublisher
publisher, err = NewLocalSDKPublisher(syncContext.LbrynetAddr, syncContext.ChannelID, syncContext.PublishBid)
if err != nil {
log.Errorf("Error setting up publisher: %v", err)
return
}
var videoSource VideoSource
if syncContext.YouTubeSourceConfig != nil {
videoSource, err = NewYtdlVideoSource(syncContext.TempDir, syncContext.YouTubeSourceConfig, syncDB)
if err != nil {
log.Errorf("Error setting up video source: %v", err)
return
}
}
latestPublishedReleaseTime := int64(0)
latestKnownReleaseTime := int64(0)
if syncContext.ForceChannelScan {
log.Infof("Channel scan is being forced.")
} else {
dbSummary, err := syncDB.GetSummary()
if err != nil {
log.Errorf("Error getting sync DB summary for update scan: %v", err)
return
}
latestPublishedReleaseTime = dbSummary.LatestPublished
latestKnownReleaseTime = dbSummary.LatestKnown
}
log.Debugf("Latest known release time: %d", latestKnownReleaseTime)
for result := range videoSource.Scan(latestKnownReleaseTime) {
if result.Error != nil {
log.Errorf("Error while discovering new videos from source: %v", result.Error)
} else {
syncDB.SaveKnownVideo(*result.Video)
}
}
log.Debugf("Latest published release time: %d", latestPublishedReleaseTime)
for result := range publisher.PublishedVideoIterator(latestPublishedReleaseTime) {
if result.Error != nil {
log.Errorf("Error while discovering published videos: %v", result.Error)
} else {
syncDB.SavePublishedVideo(*result.Video)
}
}
var videoIDs []string
if syncContext.VideoID == "" {
videoIDs, err = syncDB.GetUnpublishedIDs(videoSource.SourceName())
if err != nil {
log.Errorf("Error getting unpublished videos from sync DB: %v", err)
return
}
} else {
videoIDs = []string{ syncContext.VideoID }
}
log.Debugf("Syncing videos: %v", videoIDs)
for _, videoID := range videoIDs {
err = syncVideo(syncContext, syncDB, videoSource, publisher, videoID)
if err != nil {
log.Errorf("Error syncing %s: %v", videoID, err)
return
}
}
log.Info("Done")
}
func cacheVideo(syncContext SyncContext, syncDB *SyncDb, videoSource VideoSource, videoID string) (*PublishableVideo, error) {
log.Debugf("Ensuring video %s:%s is cached", videoSource.SourceName(), videoID)
videoRecord, err := syncDB.GetVideoRecord(videoSource.SourceName(), videoID, true, true)
if err != nil {
log.Errorf("Error checking if video is already cached: %v", err)
return nil, err
}
if videoRecord != nil && videoRecord.FullLocalPath.Valid {
log.Debugf("%s:%s is already cached.", videoSource.SourceName(), videoID)
video := videoRecord.ToPublishableVideo()
if video == nil {
log.Warnf("%s:%s appears to be cached locally, but has missing data. Caching again.")
}
return video, nil
}
log.Debugf("%s:%s is not cached locally. Caching now.", videoSource.SourceName(), videoID)
sourceVideo, err := videoSource.GetVideo(videoID)
if err != nil {
log.Errorf("Error getting source video: %v", err)
return nil, err
}
processedVideo, err := processVideoForPublishing(*sourceVideo, syncContext.ChannelID)
if err != nil {
log.Errorf("Error processing source video for publishing: %v", err)
return nil, err
}
err = syncDB.SavePublishableVideo(*processedVideo)
if err != nil {
log.Errorf("Error saving video data: %v", err)
return nil, err
}
return processedVideo, nil
}
func syncVideo(syncContext SyncContext, syncDB *SyncDb, videoSource VideoSource, publisher VideoPublisher, videoID string) error {
log.Debugf("Running sync for video %s:%s", videoSource.SourceName(), videoID)
isSynced, claimID, err := syncDB.IsVideoPublished(videoSource.SourceName(), videoID)
if err != nil {
log.Errorf("Error checking if video is already synced: %v", err)
return err
}
if isSynced {
log.Infof("Video %s:%s is already published as %s.", videoSource.SourceName(), videoID, claimID)
return nil
}
processedVideo, err := cacheVideo(syncContext, syncDB, videoSource, videoID)
if err != nil {
log.Errorf("Error ensuring video is cached prior to publication: %v", err)
return err
}
if syncContext.DryRun {
log.Infoln("This is a dry run. Nothing will be published.")
log.Infof("The local file %s would be published to channel ID %s as %s.", processedVideo.FullLocalPath, syncContext.ChannelID, processedVideo.ClaimName)
log.Debugf("Object to be published: %v", processedVideo)
} else {
claimID, doneReflectingCh, err := publisher.Publish(*processedVideo, syncContext.ReflectStreams)
if err != nil {
log.Errorf("Error publishing video: %v", err)
return err
}
err = syncDB.SavePublishedVideo((*processedVideo).ToPublished(claimID))
if err != nil {
// Sync DB is corrupted after getting here
// and will allow double publication.
log.Errorf("Error saving video publication to sync DB: %v", err)
return err
}
if syncContext.ReflectStreams {
err = <-doneReflectingCh
if err != nil {
log.Errorf("Error while wating for stream to reflect: %v", err)
return err
}
} else {
log.Debugln("Not waiting for stream to reflect.")
}
}
if !syncContext.KeepCache {
log.Infof("Deleting local files.")
err = syncDB.MarkVideoUncached(videoSource.SourceName(), videoID)
if err != nil {
log.Errorf("Error marking video %s:%s as uncached in syncDB", videoSource.SourceName(), videoID)
return err
}
err = videoSource.DeleteLocalCache(videoID)
if err != nil {
log.Errorf("Error deleting local files for video %s: %v", videoID, err)
return err
}
}
return nil
}
type SourceVideo struct {
ID string
Source string
Title *string
Description *string
SourceURL string
Languages []string
Tags []string
ReleaseTime *int64
ThumbnailURL *string
FullLocalPath *string
}
type PublishableVideo struct {
ID string
Source string
ClaimName string
Title string
Description string
SourceURL string
Languages []string
Tags []string
ReleaseTime int64
ThumbnailURL string
FullLocalPath string
}
func (v PublishableVideo) ToPublished(claimID string) PublishedVideo {
return PublishedVideo {
ClaimID: claimID,
NativeID: v.ID,
Source: v.Source,
ClaimName: v.ClaimName,
Title: v.Title,
Description: v.Description,
SourceURL: v.SourceURL,
Languages: v.Languages,
Tags: v.Tags,
ReleaseTime: v.ReleaseTime,
ThumbnailURL: v.ThumbnailURL,
FullLocalPath: v.FullLocalPath,
}
}
type PublishedVideo struct {
ClaimID string
NativeID string
Source string
ClaimName string
Title string
Description string
SourceURL string
Languages []string
Tags []string
ReleaseTime int64
ThumbnailURL string
FullLocalPath string
}
func (v PublishedVideo) ToPublishable() PublishableVideo {
return PublishableVideo {
ID: v.NativeID,
Source: v.Source,
ClaimName: v.ClaimName,
Title: v.Title,
Description: v.Description,
SourceURL: v.SourceURL,
Languages: v.Languages,
Tags: v.Tags,
ReleaseTime: v.ReleaseTime,
ThumbnailURL: v.ThumbnailURL,
FullLocalPath: v.FullLocalPath,
}
}
func processVideoForPublishing(source SourceVideo, channelID string) (*PublishableVideo, error) {
if source.FullLocalPath == nil {
return nil, errors.New("Video is not cached locally")
}
tags, err := tags_manager.SanitizeTags(source.Tags, channelID)
if err != nil {
log.Errorf("Error sanitizing tags: %v", err)
return nil, err
}
descriptionSample := ""
if source.Description != nil {
urlsRegex := regexp.MustCompile(`(?m) ?(f|ht)(tp)(s?)(://)(.*)[.|/](.*)`)
descriptionSample = urlsRegex.ReplaceAllString(*source.Description, "")
}
info := whatlanggo.Detect(descriptionSample)
title := ""
if source.Title != nil {
title = *source.Title
}
info2 := whatlanggo.Detect(title)
var languages []string = nil
if info.IsReliable() && info.Lang.Iso6391() != "" {
language := info.Lang.Iso6391()
languages = []string{language}
} else if info2.IsReliable() && info2.Lang.Iso6391() != "" {
language := info2.Lang.Iso6391()
languages = []string{language}
}
claimName := namer.NewNamer().GetNextName(title)
thumbnailURL := source.ThumbnailURL
if thumbnailURL == nil {
thumbnailURL = util.PtrToString("")
}
releaseTime := source.ReleaseTime
if releaseTime == nil {
releaseTime = util.PtrToInt64(time.Now().Unix())
}
processed := PublishableVideo {
ID: source.ID,
Source: source.Source,
ClaimName: claimName,
Title: title,
Description: getAbbrevDescription(source),
Languages: languages,
Tags: tags,
ReleaseTime: *releaseTime,
ThumbnailURL: *thumbnailURL,
FullLocalPath: *source.FullLocalPath,
}
log.Debugf("Video prepared for publication: %v", processed)
return &processed, nil
}
func getAbbrevDescription(v SourceVideo) string {
if v.Description == nil {
return v.SourceURL
}
additionalDescription := "\n...\n" + v.SourceURL
maxLength := 2800 - len(additionalDescription)
description := strings.TrimSpace(*v.Description)
if len(description) > maxLength {
description = description[:maxLength]
}
return description + additionalDescription
}
type VideoSource interface {
SourceName() string
GetVideo(id string) (*SourceVideo, error)
DeleteLocalCache(id string) error
Scan(sinceTimestamp int64) <-chan SourceScanIteratorResult
}
type SourceScanIteratorResult struct {
Video *SourceVideo
Error error
}
type VideoPublisher interface {
Publish(video PublishableVideo, reflectStream bool) (string, <-chan error, error)
PublishedVideoIterator(sinceTimestamp int64) <-chan PublishedVideoIteratorResult
}
type PublishedVideoIteratorResult struct {
Video *PublishedVideo
Error error
}

190
local/localSDKPublisher.go Normal file
View file

@ -0,0 +1,190 @@
package local
import (
"errors"
"sort"
"time"
log "github.com/sirupsen/logrus"
"github.com/lbryio/lbry.go/v2/extras/jsonrpc"
"github.com/lbryio/lbry.go/v2/extras/util"
)
type LocalSDKPublisher struct {
channelID string
publishBid float64
lbrynet *jsonrpc.Client
}
func NewLocalSDKPublisher(sdkAddr, channelID string, publishBid float64) (*LocalSDKPublisher, error) {
lbrynet := jsonrpc.NewClient(sdkAddr)
lbrynet.SetRPCTimeout(5 * time.Minute)
status, err := lbrynet.Status()
if err != nil {
return nil, err
}
if !status.IsRunning {
return nil, errors.New("SDK is not running")
}
// Should check to see if the SDK owns the channel
// Should check to see if wallet is unlocked
// but jsonrpc.Client doesn't have WalletStatus method
// so skip for now
// Should check to see if streams are configured to be reflected and warn if not
// but jsonrpc.Client doesn't have SettingsGet method to see if streams are reflected
// so use File.UploadingToReflector as a proxy for now
publisher := LocalSDKPublisher {
channelID: channelID,
publishBid: publishBid,
lbrynet: lbrynet,
}
return &publisher, nil
}
func (p *LocalSDKPublisher) Publish(video PublishableVideo, reflectStream bool) (string, <-chan error, error) {
streamCreateOptions := jsonrpc.StreamCreateOptions {
ClaimCreateOptions: jsonrpc.ClaimCreateOptions {
Title: &video.Title,
Description: &video.Description,
Languages: video.Languages,
ThumbnailURL: &video.ThumbnailURL,
Tags: video.Tags,
},
ReleaseTime: &video.ReleaseTime,
ChannelID: &p.channelID,
License: util.PtrToString("Copyrighted (contact publisher)"),
}
txSummary, err := p.lbrynet.StreamCreate(video.ClaimName, video.FullLocalPath, p.publishBid, streamCreateOptions)
if err != nil {
return "", nil, err
}
var claimID *string
for _, output := range txSummary.Outputs {
if output.Type == "claim" {
claimID = &output.ClaimID
break
}
}
if claimID == nil {
return "", nil, errors.New("Publish transaction did not have a claim output.")
}
if !reflectStream {
return "", nil, nil
}
done := make(chan error, 1)
go func() {
for {
fileListResponse, fileIndex, err := findFileByTxid(p.lbrynet, txSummary.Txid)
if err != nil {
log.Errorf("Error finding file by txid: %v", err)
done <- err
return
}
if fileListResponse == nil {
log.Errorf("Could not find file in list with correct txid")
done <- err
return
}
fileStatus := fileListResponse.Items[fileIndex]
if fileStatus.IsFullyReflected {
log.Info("Stream is fully reflected")
break
}
if !fileStatus.UploadingToReflector {
log.Error("Stream is not being uploaded to a reflector. Check your lbrynet settings if this is a mistake.")
done <- errors.New("Stream is not being reflected (check lbrynet settings).")
return
}
log.Infof("Stream reflector progress: %d%%", fileStatus.ReflectorProgress)
time.Sleep(5 * time.Second)
}
done <- nil
}()
return *claimID, done, nil
}
func (p *LocalSDKPublisher) PublishedVideoIterator(sinceTimestamp int64) <-chan PublishedVideoIteratorResult {
videoCh := make(chan PublishedVideoIteratorResult, 10)
go func() {
defer close(videoCh)
for page := uint64(0); ; page++ {
streams, err := p.lbrynet.StreamList(nil, page, 100)
if err != nil {
log.Errorf("Error listing streams (page %d): %v", page, err)
errResult := PublishedVideoIteratorResult {
Error: err,
}
videoCh <- errResult
return
}
if len(streams.Items) == 0 {
return
}
for _, stream := range streams.Items {
if stream.ChannelID != p.channelID || stream.Value.GetStream().ReleaseTime < sinceTimestamp {
continue
}
languages := []string{}
for _, language := range stream.Value.Languages {
languages = append(languages, language.String())
}
video := PublishedVideo {
ClaimID: stream.ClaimID,
NativeID: "",
Source: "",
ClaimName: stream.Name,
Title: stream.Value.Title,
Description: stream.Value.Description,
Languages: languages,
Tags: stream.Value.Tags,
ReleaseTime: stream.Value.GetStream().ReleaseTime,
ThumbnailURL: stream.Value.Thumbnail.Url,
FullLocalPath: "",
}
videoResult := PublishedVideoIteratorResult {
Video: &video,
}
videoCh <- videoResult
}
}
}()
return videoCh
}
// if jsonrpc.Client.FileList is extended to match the actual jsonrpc schema, this can be removed
func findFileByTxid(client *jsonrpc.Client, txid string) (*jsonrpc.FileListResponse, int, error) {
response, err := client.FileList(0, 20)
for {
if err != nil {
log.Errorf("Error getting file list page: %v", err)
return nil, 0, err
}
index := sort.Search(len(response.Items), func (i int) bool { return response.Items[i].Txid == txid })
if index < len(response.Items) {
return response, index, nil
}
if response.Page >= response.TotalPages {
return nil, 0, nil
}
response, err = client.FileList(response.Page + 1, 20)
}
}

69
local/readme.md Normal file
View file

@ -0,0 +1,69 @@
# Running ytsync locally
## Requirements
- LBRY SDK (what do we actually need this for?)
- youtube-dl
- enough space to cache stuff
- YouTube data API key
## Process
### Ensuring requirements are met
- claim channel if there isn't one yet
- or easier, just error if no channel
- enough lbc in wallet?
### Getting a YouTube API key
To access the YouTube data API, you will first need some kind of google account.
The API has two methods of authentication, OAuth2 and API keys. This application uses API keys.
These API keys are basically like passwords, and so once obtained, they should not be shared.
The instructions for obtaining an API key are copied below from [here](https://developers.google.com/youtube/registering_an_application):
1. Open the [Credentials page](https://console.developers.google.com/apis/credentials) in the API Console.
2. Create an API key in the Console by clicking **Create credentials > API key**. You can restrict the key before using it in production by clicking **Restrict key** and selecting one of the **Restrictions**.
To keep your API keys secure, follow the [best practices for securely using API keys](https://cloud.google.com/docs/authentication/api-keys).
### Options to figure out what's already synced
- simplest: assume nothing is synced yet
- assume everything before some video is synced
- get/put sync info from Odysee by proving you have private key for channel
- tag videos as having been synced from youtube so we can ensure accuracy
- hardest: scan channel and try to match up which videos are not synced yet
### Central DB
- prove you have a channel's private key to get info about that channel
- proper queue instead of sleeping for N minutes between syncs
### Syncing a single video
- downloading it
- thumbnails
- metadata
- having enough LBC for publish(es)
- automated error handling
- getting a human involved for errors that can't be handled automatically
- reflecting
### Continuous Sync
- running in background
- storing local state
- interactions with our central ytsync db
- dealing with yt throttling
### Debugging
- dry-running the whole thing

676
local/syncDb.go Normal file
View file

@ -0,0 +1,676 @@
package local
import (
"database/sql"
_ "github.com/mattn/go-sqlite3"
log "github.com/sirupsen/logrus"
)
type SyncDb struct {
db *sql.DB
}
type SyncDbSummary struct {
Total int
CachedUnpublished int
UncachedUnpublished int
LatestKnown int64
LatestPublished int64
}
func NewSyncDb(path string) (*SyncDb, error) {
db, err := sql.Open("sqlite3", path)
if err != nil {
log.Errorf("Error opening cache DB at %s: %v", path, err)
return nil, err
}
cache := SyncDb {
db: db,
}
err = cache.ensureSchema()
if err != nil {
log.Errorf("Error while ensuring sync DB structure: %v", err)
return nil, err
}
return &cache, nil
}
func (c *SyncDb) Close() error {
return c.db.Close()
}
func (c *SyncDb) SaveKnownVideo(video SourceVideo) error {
if video.ID == "" {
log.Warnf("Trying to save a video with no ID: %v", video)
}
insertSql := `
INSERT INTO videos (
source,
native_id,
title,
description,
source_url,
release_time,
thumbnail_url,
full_local_path
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (source, native_id)
DO NOTHING;
`
r := SyncRecordFromSourceVideo(video)
_, err := c.db.Exec(
insertSql,
r.Source,
r.NativeID,
r.Title,
r.Description,
r.SourceURL,
r.ReleaseTime,
r.ThumbnailURL,
r.FullLocalPath,
)
return err
}
func (c *SyncDb) SavePublishableVideo(video PublishableVideo) error {
upsertSql := `
INSERT INTO videos (
source,
native_id,
claim_name,
title,
description,
source_url,
release_time,
thumbnail_url,
full_local_path
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (source, native_id)
DO UPDATE SET
claim_name = excluded.claim_name,
title = excluded.title,
description = excluded.description,
source_url = excluded.source_url,
release_time = excluded.release_time,
thumbnail_url = excluded.thumbnail_url,
full_local_path = excluded.full_local_path;
`
_, err := c.db.Exec(
upsertSql,
video.Source,
video.ID,
video.ClaimName,
video.Title,
video.Description,
video.SourceURL,
video.ReleaseTime,
video.ThumbnailURL,
video.FullLocalPath,
)
if err != nil {
return err
}
err = c.upsertTags(video.Source, video.ID, video.Tags)
if err != nil {
return err
}
err = c.upsertLanguages(video.Source, video.ID, video.Languages)
return err
}
func (c *SyncDb) SavePublishedVideo(video PublishedVideo) error {
upsertSql := `
INSERT INTO videos (
source,
native_id,
claim_id,
claim_name,
title,
description,
source_url,
release_time,
thumbnail_url,
full_local_path
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (source, native_id)
DO UPDATE SET
claim_id = excluded.claim_id,
claim_name = excluded.claim_name,
title = excluded.title,
description = excluded.description,
source_url = excluded.source_url,
release_time = excluded.release_time,
thumbnail_url = excluded.thumbnail_url,
full_local_path = excluded.full_local_path;
`
_, err := c.db.Exec(
upsertSql,
video.Source,
video.NativeID,
video.ClaimID,
video.ClaimName,
video.Title,
video.Description,
video.SourceURL,
video.ReleaseTime,
video.ThumbnailURL,
video.FullLocalPath,
)
if err != nil {
return err
}
err = c.upsertTags(video.Source, video.NativeID, video.Tags)
if err != nil {
return err
}
err = c.upsertLanguages(video.Source, video.NativeID, video.Languages)
return err
}
func (c *SyncDb) MarkVideoUncached(source, id string) error {
updateSql := `
UPDATE videos
SET full_local_path = NULL
WHERE source = ? AND native_id = ?;
`
_, err := c.db.Exec(
updateSql,
source,
id,
)
return err
}
func (c *SyncDb) _SaveVideoData(video SourceVideo) error {
upsertSql := `
INSERT INTO videos (
source,
native_id,
title,
description,
source_url,
release_time,
thumbnail_url,
full_local_path
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (source, native_id)
DO UPDATE SET
title = excluded.title,
description = excluded.description,
source_url = excluded.source_url,
release_time = excluded.release_time,
thumbnail_url = excluded.thumbnail_url,
full_local_path = excluded.full_local_path;
`
_, err := c.db.Exec(
upsertSql,
video.Source,
video.ID,
video.Title,
video.Description,
video.SourceURL,
video.ReleaseTime,
video.ThumbnailURL,
video.FullLocalPath,
)
if err != nil {
return err
}
err = c.upsertTags(video.Source, video.ID, video.Tags)
if err != nil {
return err
}
err = c.upsertLanguages(video.Source, video.ID, video.Languages)
return err
}
func (c *SyncDb) SaveVideoPublication(video PublishableVideo, claimID string) error {
upsertSql := `
INSERT INTO videos (
source,
native_id,
title,
description,
source_url,
release_time,
thumbnail_url,
full_local_path,
claim_id
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (source, native_id)
DO UPDATE SET
title = excluded.title,
description = excluded.description,
source_url = excluded.source_url,
release_time = excluded.release_time,
thumbnail_url = excluded.thumbnail_url,
full_local_path = excluded.full_local_path,
claim_id = excluded.claim_id;
`
_, err := c.db.Exec(
upsertSql,
video.Source,
video.ID,
video.Title,
video.Description,
video.SourceURL,
video.ReleaseTime,
video.ThumbnailURL,
video.FullLocalPath,
claimID,
)
if err != nil {
return err
}
err = c.upsertTags(video.Source, video.ID, video.Tags)
if err != nil {
return err
}
err = c.upsertLanguages(video.Source, video.ID, video.Languages)
return err
}
func (c *SyncDb) IsVideoPublished(source, id string) (bool, string, error) {
selectSql := `
SELECT
claim_id
FROM videos
WHERE source = ? AND native_id = ?
`
row := c.db.QueryRow(selectSql, source, id)
var claimID sql.NullString
err := row.Scan(&claimID)
if err == sql.ErrNoRows {
return false, "", nil
} else if err != nil {
log.Errorf("Error querying video publication for %s:%s from sync DB: %v", source, id, err)
return false, "", err
}
if claimID.Valid {
return true, claimID.String, nil
} else {
return false, "", nil
}
}
func (c *SyncDb) IsVideoCached(source, id string) (bool, string, error) {
selectSql := `
SELECT
full_local_path
FROM videos
WHERE source = ? AND native_id = ?
`
row := c.db.QueryRow(selectSql, source, id)
var localPath sql.NullString
err := row.Scan(&localPath)
if err == sql.ErrNoRows {
return false, "", nil
} else if err != nil {
log.Errorf("Error querying video cache status for %s:%s from sync DB: %v", source, id, err)
return false, "", err
}
if localPath.Valid {
return true, localPath.String, nil
} else {
return false, "", nil
}
}
func (c *SyncDb) GetVideoRecord(source, id string, includeTags, includeLanguages bool) (*SyncRecord, error) {
selectSql := `
SELECT
native_id,
title,
description,
source_url,
release_time,
thumbnail_url,
full_local_path,
claim_id
FROM videos
WHERE source = ? AND native_id = ?
`
row := c.db.QueryRow(selectSql, source, id)
var record SyncRecord
err := row.Scan(
&record.NativeID,
&record.Title,
&record.Description,
&record.SourceURL,
&record.ReleaseTime,
&record.ThumbnailURL,
&record.FullLocalPath,
&record.ClaimID,
)
if err == sql.ErrNoRows {
log.Debugf("Data for %s:%s is not in the sync DB", source, id)
return nil, nil
} else if err != nil {
log.Errorf("Error querying video data for %s:%s from sync DB: %v", source, id, err)
return nil, err
}
if includeTags {
tags, err := c.getTags(source, id)
if err != nil {
return nil, err
}
record.Tags = &tags
}
if includeLanguages {
languages, err := c.getLanguages(source, id)
if err != nil {
return nil, err
}
record.Languages = &languages
}
return &record, nil
}
func (c *SyncDb) GetUnpublishedIDs(source string) ([]string, error) {
selectSql := `
SELECT
native_id
FROM videos
WHERE source = ? AND claim_id IS NULL
`
ids := []string{}
rows, err := c.db.Query(selectSql, source)
if err != nil {
return ids, err
}
defer rows.Close()
for rows.Next() {
var id string
err = rows.Scan(&id)
if err != nil {
return ids, err
}
ids = append(ids, id)
}
return ids, nil
}
func (c *SyncDb) GetSummary() (*SyncDbSummary, error) {
selectSql := `
SELECT
COUNT() AS total,
COUNT(v_unpub.full_local_path) AS cached_unpublished,
COUNT(v_all.claim_id) - COUNT(v_unpub.full_local_path) AS uncached_unpublished,
MAX(v_all.release_time) AS latest_known,
MAX(v_pub.release_time) AS latest_published
FROM videos v_all
LEFT JOIN videos v_pub ON v_all.source = v_pub.source AND v_all.native_id = v_pub.native_id AND v_pub.claim_id IS NOT NULL
LEFT JOIN videos v_unpub ON v_all.source = v_unpub.source AND v_all.native_id = v_unpub.native_id AND v_unpub.claim_id IS NULL
`
row := c.db.QueryRow(selectSql)
var summary SyncDbSummary
var latestKnown, latestPublished sql.NullInt64
err := row.Scan(
&summary.Total,
&summary.CachedUnpublished,
&summary.UncachedUnpublished,
&latestKnown,
&latestPublished,
)
if err != nil {
log.Errorf("Error querying sync DB summary: %v", err)
return nil, err
}
if latestKnown.Valid {
summary.LatestKnown = latestKnown.Int64
}
if latestPublished.Valid {
summary.LatestPublished = latestPublished.Int64
}
return &summary, nil
}
func (c *SyncDb) ensureSchema() error {
createSql := `
CREATE TABLE IF NOT EXISTS videos (
source TEXT,
native_id TEXT,
title TEXT,
description TEXT,
source_url TEXT,
release_time INT,
thumbnail_url TEXT,
full_local_path TEXT,
claim_name TEXT,
claim_id TEXT,
PRIMARY KEY (source, native_id)
);
CREATE TABLE IF NOT EXISTS video_tags (
source TEXT NOT NULL,
native_id TEXT NOT NULL,
tag TEXT NOT NULL,
UNIQUE (source, native_id, tag)
);
CREATE TABLE IF NOT EXISTS video_languages (
source TEXT NOT NULL,
native_id TEXT NOT NULL,
language TEXT NOT NULL,
UNIQUE (source, native_id, language)
);
`
_, err := c.db.Exec(createSql)
return err
}
func (c *SyncDb) upsertTags(source, id string, tags []string) error {
upsertSql := `
INSERT INTO video_tags (
source,
native_id,
tag
) VALUES (?, ?, ?)
ON CONFLICT (source, native_id, tag)
DO NOTHING;
`
for _, tag := range tags {
_, err := c.db.Exec(
upsertSql,
source,
id,
tag,
)
if err != nil {
log.Errorf("Error inserting tag %s into sync DB for %s:%s: %v", tag, source, id, err)
return err
}
}
return nil
}
func (c *SyncDb) getTags(source, id string) ([]string, error) {
selectSql := `
SELECT tag
FROM video_tags
WHERE source = ? AND native_id = ?;
`
rows, err := c.db.Query(selectSql, source, id)
if err != nil {
log.Errorf("Error getting tags from sync DB for %s:%s: %v", source, id, err)
return nil, err
}
defer rows.Close()
var tags []string
for rows.Next() {
var tag string
err = rows.Scan(&tag)
if err != nil {
log.Error("Error deserializing tag from sync DB for %s:%s: %v", source, id, err)
return nil, err
}
tags = append(tags, tag)
}
return tags, nil
}
func (c *SyncDb) upsertLanguages(source, id string, languages []string) error {
upsertSql := `
INSERT INTO video_languages (
source,
native_id,
language
) VALUES (?, ?, ?)
ON CONFLICT (source, native_id, language)
DO NOTHING;
`
for _, language := range languages {
_, err := c.db.Exec(
upsertSql,
source,
id,
language,
)
if err != nil {
log.Errorf("Error inserting language %s into sync DB for %s:%s: %v", language, source, id, err)
return err
}
}
return nil
}
func (c *SyncDb) getLanguages(source, id string) ([]string, error) {
selectSql := `
SELECT language
FROM video_languages
WHERE source = ? AND native_id = ?;
`
rows, err := c.db.Query(selectSql, source, id)
if err != nil {
log.Errorf("Error getting languages from sync DB for %s:%s: %v", source, id, err)
return nil, err
}
defer rows.Close()
var languages []string
for rows.Next() {
var language string
err = rows.Scan(&language)
if err != nil {
log.Error("Error deserializing language from sync DB for %s:%s: %v", source, id, err)
return nil, err
}
languages = append(languages, language)
}
return languages, nil
}
type SyncRecord struct {
Source string
NativeID string
Title sql.NullString
Description sql.NullString
SourceURL sql.NullString
ReleaseTime sql.NullInt64
ThumbnailURL sql.NullString
FullLocalPath sql.NullString
ClaimID sql.NullString
Tags *[]string
Languages *[]string
}
func SyncRecordFromSourceVideo(v SourceVideo) SyncRecord {
r := SyncRecord {
Source: v.Source,
NativeID: v.ID,
SourceURL: sql.NullString { String: v.SourceURL, Valid: true },
}
if v.Title != nil {
r.Title = sql.NullString { String: *v.Title, Valid: true }
}
if v.Description != nil {
r.Description = sql.NullString { String: *v.Description, Valid: true }
}
if v.ThumbnailURL != nil {
r.ThumbnailURL = sql.NullString { String: *v.ThumbnailURL, Valid: true }
}
if v.FullLocalPath != nil {
r.FullLocalPath = sql.NullString { String: *v.FullLocalPath, Valid: true }
}
if v.ReleaseTime != nil {
r.ReleaseTime = sql.NullInt64 { Int64: *v.ReleaseTime, Valid: true }
}
if len(v.Tags) > 0 {
r.Tags = &v.Tags
}
if len(v.Languages) > 0 {
r.Languages = &v.Languages
}
return r
}
func (r *SyncRecord) ToPublishableVideo() *PublishableVideo {
if !(r.Title.Valid &&
r.Description.Valid &&
r.SourceURL.Valid &&
r.ReleaseTime.Valid &&
r.ThumbnailURL.Valid &&
r.FullLocalPath.Valid &&
r.Tags != nil &&
r.Languages != nil) {
return nil
}
video := PublishableVideo {
ID: r.NativeID,
Source: r.Source,
Description: r.Description.String,
SourceURL: r.SourceURL.String,
ReleaseTime: r.ReleaseTime.Int64,
ThumbnailURL: r.ThumbnailURL.String,
FullLocalPath: r.FullLocalPath.String,
Tags: *r.Tags,
Languages: *r.Languages,
}
return &video
}

View file

@ -0,0 +1,51 @@
package local
type YouTubeChannelScanner interface {
Scan(sinceTimestamp int64) <-chan SourceScanIteratorResult
}
type YouTubeAPIChannelScanner struct {
api *YouTubeAPI
channel string
}
func NewYouTubeAPIChannelScanner(apiKey, channel string) (*YouTubeAPIChannelScanner) {
scanner := YouTubeAPIChannelScanner {
api: NewYouTubeAPI(apiKey),
channel: channel,
}
return &scanner
}
func (s *YouTubeAPIChannelScanner) Scan(sinceTimestamp int64) <-chan SourceScanIteratorResult {
videoCh := make(chan SourceScanIteratorResult, 10)
go func() {
defer close(videoCh)
for firstRun, nextPage := true, ""; firstRun || nextPage != ""; {
var videos []SourceVideo
var err error
firstRun = false
videos, nextPage, err = s.api.GetChannelVideosPage(s.channel, sinceTimestamp, nextPage)
if err != nil {
videoCh <- SourceScanIteratorResult {
Video: nil,
Error: err,
}
return
}
for _, video := range videos {
outVideo := video
videoCh <- SourceScanIteratorResult {
Video: &outVideo,
Error: nil,
}
}
}
}()
return videoCh
}

74
local/youtubeEnricher.go Normal file
View file

@ -0,0 +1,74 @@
package local
import (
"time"
log "github.com/sirupsen/logrus"
"github.com/lbryio/lbry.go/v2/extras/util"
)
type YouTubeVideoEnricher interface {
EnrichMissing(source *SourceVideo) error
}
type YouTubeAPIVideoEnricher struct {
api *YouTubeAPI
}
func NewYouTubeAPIVideoEnricher(apiKey string) (*YouTubeAPIVideoEnricher) {
enricher := YouTubeAPIVideoEnricher{
api: NewYouTubeAPI(apiKey),
}
return &enricher
}
func (e *YouTubeAPIVideoEnricher) EnrichMissing(source *SourceVideo) error {
if source.ReleaseTime != nil {
log.Debugf("Video %s does not need enrichment. YouTubeAPIVideoEnricher is skipping.", source.ID)
return nil
}
snippet, err := e.api.GetVideoSnippet(source.ID)
if err != nil {
log.Errorf("Error snippet data for video %s: %v", err)
return err
}
publishedAt, err := time.Parse(time.RFC3339, snippet.PublishedAt)
if err != nil {
log.Errorf("Error converting publishedAt to timestamp: %v", err)
} else {
source.ReleaseTime = util.PtrToInt64(publishedAt.Unix())
}
return nil
}
type CacheVideoEnricher struct {
syncDB *SyncDb
}
func NewCacheVideoEnricher(syncDB *SyncDb) *CacheVideoEnricher {
enricher := CacheVideoEnricher {
syncDB,
}
return &enricher
}
func (e *CacheVideoEnricher) EnrichMissing(source *SourceVideo) error {
if source.ReleaseTime != nil {
log.Debugf("Video %s does not need enrichment. YouTubeAPIVideoEnricher is skipping.", source.ID)
return nil
}
cached, err := e.syncDB.GetVideoRecord(source.Source, source.ID, false, false)
if err != nil {
log.Errorf("Error getting cached video %s: %v", source.ID, err)
return err
}
if cached != nil && cached.ReleaseTime.Valid {
source.ReleaseTime = &cached.ReleaseTime.Int64
}
return nil
}

157
local/ytapi.go Normal file
View file

@ -0,0 +1,157 @@
package local
import (
"encoding/json"
"fmt"
"io"
"net/http"
"time"
log "github.com/sirupsen/logrus"
"github.com/lbryio/lbry.go/v2/extras/util"
)
type YouTubeAPI struct {
apiKey string
client *http.Client
}
func NewYouTubeAPI(apiKey string) (*YouTubeAPI) {
client := &http.Client {
Transport: &http.Transport{
MaxIdleConns: 10,
IdleConnTimeout: 30 * time.Second,
DisableCompression: true,
},
}
api := YouTubeAPI {
apiKey: apiKey,
client: client,
}
return &api
}
func (a *YouTubeAPI) GetVideoSnippet(videoID string) (*VideoSnippet, error) {
req, err := http.NewRequest("GET", "https://youtube.googleapis.com/youtube/v3/videos", nil)
if err != nil {
log.Errorf("Error creating http client for YouTube API: %v", err)
return nil, err
}
query := req.URL.Query()
query.Add("part", "snippet")
query.Add("id", videoID)
query.Add("key", a.apiKey)
req.URL.RawQuery = query.Encode()
req.Header.Add("Accept", "application/json")
resp, err := a.client.Do(req)
defer resp.Body.Close()
if err != nil {
log.Errorf("Error from YouTube API: %v", err)
return nil, err
}
body, err := io.ReadAll(resp.Body)
log.Tracef("Response from YouTube API: %s", string(body[:]))
var result videoListResponse
err = json.Unmarshal(body, &result)
if err != nil {
log.Errorf("Error deserializing video list response from YouTube API: %v", err)
return nil, err
}
if len(result.Items) != 1 {
err = fmt.Errorf("YouTube API responded with incorrect number of snippets (%d) while attempting to get snippet data for video %s", len(result.Items), videoID)
return nil, err
}
return &result.Items[0].Snippet, nil
}
func (a *YouTubeAPI) GetChannelVideosPage(channelID string, publishedAfter int64, pageToken string) ([]SourceVideo, string, error) {
req, err := http.NewRequest("GET", "https://youtube.googleapis.com/youtube/v3/search", nil)
if err != nil {
log.Errorf("Error creating http client for YouTube API: %v", err)
return []SourceVideo{}, "", err
}
query := req.URL.Query()
query.Add("part", "snippet")
query.Add("type", "video")
query.Add("channelId", channelID)
query.Add("publishedAfter", time.Unix(publishedAfter, 0).Format(time.RFC3339))
query.Add("maxResults", "5")
if pageToken != "" {
query.Add("pageToken", pageToken)
}
query.Add("key", a.apiKey)
req.URL.RawQuery = query.Encode()
req.Header.Add("Accept", "application/json")
resp, err := a.client.Do(req)
defer resp.Body.Close()
if err != nil {
log.Errorf("Error from YouTube API: %v", err)
return []SourceVideo{}, "", err
}
body, err := io.ReadAll(resp.Body)
log.Tracef("Response from YouTube API: %s", string(body[:]))
var result videoSearchResponse
err = json.Unmarshal(body, &result)
if err != nil {
log.Errorf("Error deserializing video list response from YouTube API: %v", err)
return []SourceVideo{}, "", err
}
videos := []SourceVideo{}
for _, item := range result.Items {
var releaseTime *int64
publishedAt, err := time.Parse(time.RFC3339, item.Snippet.PublishedAt)
if err != nil {
log.Errorf("Unable to parse publish time of %s while scanning YouTube channel %s: %v", item.ID.VideoID, channelID)
releaseTime = nil
} else {
releaseTime = util.PtrToInt64(publishedAt.Unix())
}
video := SourceVideo {
ID: item.ID.VideoID,
Source: "YouTube",
ReleaseTime: releaseTime,
}
videos = append(videos, video)
}
return videos, result.NextPageToken, nil
}
type videoListResponse struct {
NextPageToken string `json:"nextPageToken"`
Items []struct {
ID string `json:"id"`
Snippet VideoSnippet `json:"snippet"`
} `json:"items"`
}
type videoSearchResponse struct {
NextPageToken string `json:"nextPageToken"`
Items []struct {
ID struct{
VideoID string `json:"videoId"`
} `json:"id"`
Snippet VideoSnippet `json:"snippet"`
} `json:"items"`
}
type VideoSnippet struct {
PublishedAt string `json:"publishedAt"`
}

239
local/ytdl.go Normal file
View file

@ -0,0 +1,239 @@
package local
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"os"
"os/exec"
"path"
"strings"
log "github.com/sirupsen/logrus"
"github.com/lbryio/ytsync/v5/downloader/ytdl"
)
type Ytdl struct {
DownloadDir string
}
func NewYtdl(downloadDir string) (*Ytdl, error) {
// TODO validate download dir
y := Ytdl {
DownloadDir: downloadDir,
}
return &y, nil
}
func (y *Ytdl) GetVideoMetadata(videoID string) (*ytdl.YtdlVideo, error) {
metadataPath, err := y.GetVideoMetadataFile(videoID)
if err != nil {
return nil, err
}
metadataBytes, err := os.ReadFile(metadataPath)
if err != nil {
return nil, err
}
var metadata *ytdl.YtdlVideo
err = json.Unmarshal(metadataBytes, &metadata)
if err != nil {
return nil, err
}
return metadata, nil
}
func (y *Ytdl) GetVideoMetadataFile(videoID string) (string, error) {
basePath := path.Join(y.DownloadDir, videoID)
metadataPath := basePath + ".info.json"
_, err := os.Stat(metadataPath)
if err != nil && !os.IsNotExist(err) {
log.Errorf("Error determining if video metadata already exists: %v", err)
return "", err
} else if err != nil {
log.Debugf("Metadata file for video %s does not exist. Downloading now.", videoID)
err = downloadVideoMetadata(basePath, videoID)
if err != nil {
return "", err
}
}
return metadataPath, nil
}
func (y *Ytdl) GetVideoFile(videoID string) (string, error) {
videoPath, err := findDownloadedVideo(y.DownloadDir, videoID)
if err != nil {
return "", err
}
if videoPath != nil {
return *videoPath, nil
}
basePath := path.Join(y.DownloadDir, videoID)
metadataPath, err := y.GetVideoMetadataFile(videoID)
if err != nil {
log.Errorf("Error getting metadata path in preparation for video download: %v", err)
return "", err
}
err = downloadVideo(basePath, metadataPath)
if err != nil {
return "", nil
}
videoPath, err = findDownloadedVideo(y.DownloadDir, videoID)
if err != nil {
log.Errorf("Error from findDownloadedVideo() after already succeeding once: %v", err)
return "", err
}
if videoPath == nil {
return "", errors.New("Could not find a downloaded video after successful download.")
}
return *videoPath, nil
}
func (y *Ytdl) DeleteVideoFiles(videoID string) error {
files, err := ioutil.ReadDir(y.DownloadDir)
if err != nil {
return err
}
for _, f := range files {
if f.IsDir() {
continue
}
if strings.Contains(f.Name(), videoID) {
videoPath := path.Join(y.DownloadDir, f.Name())
err = os.Remove(videoPath)
if err != nil {
log.Errorf("Error while deleting file %s: %v", y.DownloadDir, err)
return err
}
}
}
return nil
}
func deleteFile(path string) error {
_, err := os.Stat(path)
if err != nil && !os.IsNotExist(err) {
log.Errorf("Error determining if file %s exists: %v", path, err)
return err
} else if err != nil {
log.Debugf("File %s does not exist. Skipping deletion.", path)
return nil
}
return os.Remove(path)
}
func findDownloadedVideo(videoDir, videoID string) (*string, error) {
files, err := ioutil.ReadDir(videoDir)
if err != nil {
return nil, err
}
for _, f := range files {
if f.IsDir() {
continue
}
if path.Ext(f.Name()) == ".mp4" && strings.Contains(f.Name(), videoID) {
videoPath := path.Join(videoDir, f.Name())
return &videoPath, nil
}
}
return nil, nil
}
func downloadVideoMetadata(basePath, videoID string) error {
ytdlArgs := []string{
"--skip-download",
"--write-info-json",
"--force-overwrites",
fmt.Sprintf("https://www.youtube.com/watch?v=%s", videoID),
"--cookies",
"cookies.txt",
"-o",
basePath,
}
ytdlCmd := exec.Command("yt-dlp", ytdlArgs...)
output, err := runCmd(ytdlCmd)
log.Debug(output)
return err
}
func downloadVideo(basePath, metadataPath string) error {
ytdlArgs := []string{
"--no-progress",
"-o",
basePath,
"--merge-output-format",
"mp4",
"--postprocessor-args",
"ffmpeg:-movflags faststart",
"--abort-on-unavailable-fragment",
"--fragment-retries",
"1",
"--cookies",
"cookies.txt",
"--extractor-args",
"youtube:player_client=android",
"--load-info-json",
metadataPath,
"-fbestvideo[ext=mp4][vcodec!*=av01][height<=720]+bestaudio[ext!=webm][format_id!=258][format_id!=251][format_id!=256][format_id!=327]",
}
ytdlCmd := exec.Command("yt-dlp", ytdlArgs...)
output, err := runCmd(ytdlCmd)
log.Debug(output)
return err
}
func runCmd(cmd *exec.Cmd) ([]string, error) {
log.Infof("running cmd: %s", strings.Join(cmd.Args, " "))
var err error
stderr, err := cmd.StderrPipe()
if err != nil {
return nil, err
}
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
err = cmd.Start()
if err != nil {
return nil, err
}
outLog, err := ioutil.ReadAll(stdout)
if err != nil {
return nil, err
}
errorLog, err := ioutil.ReadAll(stderr)
if err != nil {
return nil, err
}
done := make(chan error, 1)
go func() {
done <- cmd.Wait()
}()
select {
case err := <-done:
if err != nil {
log.Error(string(errorLog))
return nil, err
}
return strings.Split(strings.Replace(string(outLog), "\r\n", "\n", -1), "\n"), nil
}
}

101
local/ytdlVideoSource.go Normal file
View file

@ -0,0 +1,101 @@
package local
import (
log "github.com/sirupsen/logrus"
"github.com/lbryio/ytsync/v5/downloader/ytdl"
)
type YtdlVideoSource struct {
downloader Ytdl
channelScanner YouTubeChannelScanner
enrichers []YouTubeVideoEnricher
}
func NewYtdlVideoSource(downloadDir string, config *YouTubeSourceConfig, syncDB *SyncDb) (*YtdlVideoSource, error) {
ytdl, err := NewYtdl(downloadDir)
if err != nil {
return nil, err
}
source := YtdlVideoSource {
downloader: *ytdl,
}
if syncDB != nil {
source.enrichers = append(source.enrichers, NewCacheVideoEnricher(syncDB))
}
if config.APIKey != "" {
ytapiEnricher := NewYouTubeAPIVideoEnricher(config.APIKey)
source.enrichers = append(source.enrichers, ytapiEnricher)
source.channelScanner = NewYouTubeAPIChannelScanner(config.APIKey, config.ChannelID)
}
if source.channelScanner == nil {
log.Warnf("No means of scanning source channels has been provided")
}
return &source, nil
}
func (s *YtdlVideoSource) SourceName() string {
return "YouTube"
}
func (s *YtdlVideoSource) GetVideo(id string) (*SourceVideo, error) {
metadata, err := s.downloader.GetVideoMetadata(id)
if err != nil {
return nil, err
}
videoPath, err := s.downloader.GetVideoFile(id)
if err != nil {
return nil, err
}
var bestThumbnail *ytdl.Thumbnail = nil
for i, thumbnail := range metadata.Thumbnails {
if i == 0 || bestThumbnail.Width < thumbnail.Width {
bestThumbnail = &thumbnail
}
}
sourceVideo := SourceVideo {
ID: id,
Source: "YouTube",
Title: &metadata.Title,
Description: &metadata.Description,
SourceURL: "\nhttps://www.youtube.com/watch?v=" + id,
Languages: []string{},
Tags: metadata.Tags,
ReleaseTime: nil,
ThumbnailURL: &bestThumbnail.URL,
FullLocalPath: &videoPath,
}
for _, enricher := range s.enrichers {
err = enricher.EnrichMissing(&sourceVideo)
if err != nil {
log.Warnf("Error enriching video %s, continuing enrichment: %v", id, err)
}
}
log.Debugf("Source video retrieved via ytdl: %v", sourceVideo)
return &sourceVideo, nil
}
func (s *YtdlVideoSource) DeleteLocalCache(id string) error {
return s.downloader.DeleteVideoFiles(id)
}
func (s *YtdlVideoSource) Scan(sinceTimestamp int64) <-chan SourceScanIteratorResult {
if s.channelScanner != nil {
return s.channelScanner.Scan(sinceTimestamp)
}
videoCh := make(chan SourceScanIteratorResult, 1)
close(videoCh)
return videoCh
}

158
main.go
View file

@ -1,172 +1,24 @@
package main
import (
"fmt"
"math/rand"
"net/http"
"os"
"time"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/util"
"github.com/lbryio/ytsync/v5/manager"
"github.com/lbryio/ytsync/v5/sdk"
"github.com/lbryio/ytsync/v5/shared"
ytUtils "github.com/lbryio/ytsync/v5/util"
"github.com/lbryio/ytsync/v5/cmd"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
var Version string
const defaultMaxTries = 3
var (
cliFlags shared.SyncFlags
maxVideoLength int
)
func main() {
rand.Seed(time.Now().UnixNano())
log.SetLevel(log.DebugLevel)
http.Handle("/metrics", promhttp.Handler())
go func() {
http.Handle("/metrics", promhttp.Handler())
log.Error(http.ListenAndServe(":2112", nil))
}()
cmd := &cobra.Command{
Use: "ytsync",
Short: "Publish youtube channels into LBRY network automatically.",
Run: ytSync,
Args: cobra.RangeArgs(0, 0),
}
cmd.Flags().IntVar(&cliFlags.MaxTries, "max-tries", defaultMaxTries, "Number of times to try a publish that fails")
cmd.Flags().BoolVar(&cliFlags.TakeOverExistingChannel, "takeover-existing-channel", false, "If channel exists and we don't own it, take over the channel")
cmd.Flags().IntVar(&cliFlags.Limit, "limit", 0, "limit the amount of channels to sync")
cmd.Flags().BoolVar(&cliFlags.SkipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup")
cmd.Flags().BoolVar(&cliFlags.SyncUpdate, "update", false, "Update previously synced channels instead of syncing new ones")
cmd.Flags().BoolVar(&cliFlags.SingleRun, "run-once", false, "Whether the process should be stopped after one cycle or not")
cmd.Flags().BoolVar(&cliFlags.RemoveDBUnpublished, "remove-db-unpublished", false, "Remove videos from the database that are marked as published but aren't really published")
cmd.Flags().BoolVar(&cliFlags.UpgradeMetadata, "upgrade-metadata", false, "Upgrade videos if they're on the old metadata version")
cmd.Flags().BoolVar(&cliFlags.DisableTransfers, "no-transfers", false, "Skips the transferring process of videos, channels and supports")
cmd.Flags().BoolVar(&cliFlags.QuickSync, "quick", false, "Look up only the last 50 videos from youtube")
cmd.Flags().StringVar(&cliFlags.Status, "status", "", "Specify which queue to pull from. Overrides --update")
cmd.Flags().StringVar(&cliFlags.SecondaryStatus, "status2", "", "Specify which secondary queue to pull from.")
cmd.Flags().StringVar(&cliFlags.ChannelID, "channelID", "", "If specified, only this channel will be synced.")
cmd.Flags().Int64Var(&cliFlags.SyncFrom, "after", time.Unix(0, 0).Unix(), "Specify from when to pull jobs [Unix time](Default: 0)")
cmd.Flags().Int64Var(&cliFlags.SyncUntil, "before", time.Now().AddDate(1, 0, 0).Unix(), "Specify until when to pull jobs [Unix time](Default: current Unix time)")
cmd.Flags().IntVar(&cliFlags.ConcurrentJobs, "concurrent-jobs", 1, "how many jobs to process concurrently")
cmd.Flags().IntVar(&cliFlags.VideosLimit, "videos-limit", 0, "how many videos to process per channel (leave 0 for automatic detection)")
cmd.Flags().IntVar(&cliFlags.MaxVideoSize, "max-size", 2048, "Maximum video size to process (in MB)")
cmd.Flags().IntVar(&maxVideoLength, "max-length", 2, "Maximum video length to process (in hours)")
if err := cmd.Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
}
}
func ytSync(cmd *cobra.Command, args []string) {
var hostname string
slackToken := os.Getenv("SLACK_TOKEN")
if slackToken == "" {
log.Error("A slack token was not present in env vars! Slack messages disabled!")
} else {
var err error
hostname, err = os.Hostname()
if err != nil {
log.Error("could not detect system hostname")
hostname = "ytsync-unknown"
}
if len(hostname) > 30 {
hostname = hostname[0:30]
}
util.InitSlack(os.Getenv("SLACK_TOKEN"), os.Getenv("SLACK_CHANNEL"), hostname)
}
if cliFlags.Status != "" && !util.InSlice(cliFlags.Status, shared.SyncStatuses) {
log.Errorf("status must be one of the following: %v\n", shared.SyncStatuses)
return
}
if cliFlags.MaxTries < 1 {
log.Errorln("setting --max-tries less than 1 doesn't make sense")
return
}
if cliFlags.Limit < 0 {
log.Errorln("setting --limit less than 0 (unlimited) doesn't make sense")
return
}
cliFlags.MaxVideoLength = time.Duration(maxVideoLength) * time.Hour
apiURL := os.Getenv("LBRY_WEB_API")
apiToken := os.Getenv("LBRY_API_TOKEN")
youtubeAPIKey := os.Getenv("YOUTUBE_API_KEY")
lbrycrdDsn := os.Getenv("LBRYCRD_STRING")
awsS3ID := os.Getenv("AWS_S3_ID")
awsS3Secret := os.Getenv("AWS_S3_SECRET")
awsS3Region := os.Getenv("AWS_S3_REGION")
awsS3Bucket := os.Getenv("AWS_S3_BUCKET")
if apiURL == "" {
log.Errorln("An API URL was not defined. Please set the environment variable LBRY_WEB_API")
return
}
if apiToken == "" {
log.Errorln("An API Token was not defined. Please set the environment variable LBRY_API_TOKEN")
return
}
if youtubeAPIKey == "" {
log.Errorln("A Youtube API key was not defined. Please set the environment variable YOUTUBE_API_KEY")
return
}
if awsS3ID == "" {
log.Errorln("AWS S3 ID credentials were not defined. Please set the environment variable AWS_S3_ID")
return
}
if awsS3Secret == "" {
log.Errorln("AWS S3 Secret credentials were not defined. Please set the environment variable AWS_S3_SECRET")
return
}
if awsS3Region == "" {
log.Errorln("AWS S3 Region was not defined. Please set the environment variable AWS_S3_REGION")
return
}
if awsS3Bucket == "" {
log.Errorln("AWS S3 Bucket was not defined. Please set the environment variable AWS_S3_BUCKET")
return
}
if lbrycrdDsn == "" {
log.Infoln("Using default (local) lbrycrd instance. Set LBRYCRD_STRING if you want to use something else")
}
blobsDir := ytUtils.GetBlobsDir()
apiConfig := &sdk.APIConfig{
YoutubeAPIKey: youtubeAPIKey,
ApiURL: apiURL,
ApiToken: apiToken,
HostName: hostname,
}
awsConfig := &shared.AwsConfigs{
AwsS3ID: awsS3ID,
AwsS3Secret: awsS3Secret,
AwsS3Region: awsS3Region,
AwsS3Bucket: awsS3Bucket,
}
sm := manager.NewSyncManager(
cliFlags,
blobsDir,
lbrycrdDsn,
awsConfig,
apiConfig,
)
err := sm.Start()
if err != nil {
ytUtils.SendErrorToSlack(errors.FullTrace(err))
}
ytUtils.SendInfoToSlack("Syncing process terminated!")
cmd.Execute()
}

View file

@ -58,9 +58,12 @@ func (s *SyncManager) Start() error {
}
}
var lastChannelProcessed string
var secondLastChannelProcessed string
syncCount := 0
var (
lastChannelProcessed string
secondLastChannelProcessed string
syncCount int
)
for {
s.channelsToSync = make([]Sync, 0, 10) // reset sync queue
err := s.checkUsedSpace()
@ -108,10 +111,12 @@ func (s *SyncManager) Start() error {
log.Infof("Drained the \"%s\" queue", q)
}
}
if len(s.channelsToSync) == 0 {
log.Infoln("No channels to sync. Pausing 5 minutes!")
time.Sleep(5 * time.Minute)
}
for _, sync := range s.channelsToSync {
if lastChannelProcessed == sync.DbChannelData.ChannelId && secondLastChannelProcessed == lastChannelProcessed {
util.SendToSlack("We just killed a sync for %s to stop looping! (%s)", sync.DbChannelData.DesiredChannelName, sync.DbChannelData.ChannelId)
@ -174,6 +179,7 @@ func (s *SyncManager) Start() error {
break
}
}
return nil
}