Tracking synced videos #115
11 changed files with 1949 additions and 5 deletions
1
go.mod
1
go.mod
|
@ -22,6 +22,7 @@ require (
|
||||||
github.com/kr/pretty v0.2.1 // indirect
|
github.com/kr/pretty v0.2.1 // indirect
|
||||||
github.com/lbryio/lbry.go/v2 v2.7.2-0.20210824154606-3e18b74da08b
|
github.com/lbryio/lbry.go/v2 v2.7.2-0.20210824154606-3e18b74da08b
|
||||||
github.com/lbryio/reflector.go v1.1.3-0.20210412225256-4392c9724262
|
github.com/lbryio/reflector.go v1.1.3-0.20210412225256-4392c9724262
|
||||||
|
github.com/mattn/go-sqlite3 v1.10.0
|
||||||
github.com/miekg/dns v1.1.22 // indirect
|
github.com/miekg/dns v1.1.22 // indirect
|
||||||
github.com/mitchellh/go-ps v0.0.0-20190716172923-621e5597135b
|
github.com/mitchellh/go-ps v0.0.0-20190716172923-621e5597135b
|
||||||
github.com/opencontainers/go-digest v1.0.0-rc1 // indirect
|
github.com/opencontainers/go-digest v1.0.0-rc1 // indirect
|
||||||
|
|
1
go.sum
1
go.sum
|
@ -318,6 +318,7 @@ github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaO
|
||||||
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
|
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
|
||||||
github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU=
|
github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU=
|
||||||
github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
|
github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
|
||||||
|
github.com/mattn/go-sqlite3 v1.10.0 h1:jbhqpg7tQe4SupckyijYiy0mJJ/pRyHvXf7JdWK860o=
|
||||||
github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
|
github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
|
||||||
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
|
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
|
||||||
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
|
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
|
||||||
|
|
446
local/local.go
446
local/local.go
|
@ -1,22 +1,460 @@
|
||||||
package local
|
package local
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"errors"
|
||||||
|
"os"
|
||||||
|
"regexp"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
|
"github.com/abadojack/whatlanggo"
|
||||||
|
|
||||||
|
"github.com/lbryio/lbry.go/v2/extras/util"
|
||||||
|
"github.com/lbryio/ytsync/v5/namer"
|
||||||
|
"github.com/lbryio/ytsync/v5/tags_manager"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type SyncContext struct {
|
||||||
|
DryRun bool
|
||||||
|
KeepCache bool
|
||||||
|
ReflectStreams bool
|
||||||
|
ForceChannelScan bool
|
||||||
|
TempDir string
|
||||||
|
SyncDbPath string
|
||||||
|
LbrynetAddr string
|
||||||
|
ChannelID string
|
||||||
|
VideoID string
|
||||||
|
PublishBid float64
|
||||||
|
YouTubeSourceConfig *YouTubeSourceConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SyncContext) Validate() error {
|
||||||
|
if c.TempDir == "" {
|
||||||
|
return errors.New("No TempDir provided")
|
||||||
|
}
|
||||||
|
if c.SyncDbPath == "" {
|
||||||
|
return errors.New("No sync DB path provided")
|
||||||
|
}
|
||||||
|
if c.LbrynetAddr == "" {
|
||||||
|
return errors.New("No Lbrynet address provided")
|
||||||
|
}
|
||||||
|
if c.ChannelID == "" {
|
||||||
|
return errors.New("No channel ID provided")
|
||||||
|
}
|
||||||
|
if c.PublishBid <= 0.0 {
|
||||||
|
return errors.New("Publish bid is not greater than zero")
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.YouTubeSourceConfig.ChannelID != "" {
|
||||||
|
// Validate for YouTube source
|
||||||
|
// For now, an API key is required
|
||||||
|
if c.YouTubeSourceConfig.APIKey == "" {
|
||||||
|
return errors.New("YouTube source was selected, but no YouTube API key was provided.")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return errors.New("No video source provided")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type YouTubeSourceConfig struct {
|
||||||
|
ChannelID string
|
||||||
|
APIKey string
|
||||||
|
}
|
||||||
|
|
||||||
|
var syncContext SyncContext
|
||||||
|
|
||||||
func AddCommand(rootCmd *cobra.Command) {
|
func AddCommand(rootCmd *cobra.Command) {
|
||||||
cmd := &cobra.Command{
|
cmd := &cobra.Command{
|
||||||
Use: "local",
|
Use: "local",
|
||||||
Short: "run a personal ytsync",
|
Short: "run a personal ytsync",
|
||||||
Run: localCmd,
|
Run: localCmd,
|
||||||
|
Args: cobra.ExactArgs(0),
|
||||||
}
|
}
|
||||||
//cmd.Flags().StringVar(&cache, "cache", "", "path to cache")
|
cmd.Flags().BoolVar(&syncContext.DryRun, "dry-run", false, "Display information about the stream publishing, but do not publish the stream")
|
||||||
rootCmd.AddCommand(cmd)
|
cmd.Flags().BoolVar(&syncContext.KeepCache, "keep-cache", false, "Don't delete local files after publishing.")
|
||||||
|
cmd.Flags().BoolVar(&syncContext.ReflectStreams, "reflect-streams", true, "Require published streams to be reflected.")
|
||||||
|
cmd.Flags().BoolVar(&syncContext.ForceChannelScan, "force-rescan", false, "Rescan channel to fill the sync DB.")
|
||||||
|
cmd.Flags().StringVar(&syncContext.TempDir, "temp-dir", getEnvDefault("TEMP_DIR", ""), "directory to use for temporary files")
|
||||||
|
cmd.Flags().StringVar(&syncContext.SyncDbPath, "sync-db-path", getEnvDefault("SYNC_DB_PATH", ""), "Path to the local sync DB")
|
||||||
|
cmd.Flags().Float64Var(&syncContext.PublishBid, "publish-bid", 0.01, "Bid amount for the stream claim")
|
||||||
|
cmd.Flags().StringVar(&syncContext.LbrynetAddr, "lbrynet-address", getEnvDefault("LBRYNET_ADDRESS", ""), "JSONRPC address of the local LBRYNet daemon")
|
||||||
|
cmd.Flags().StringVar(&syncContext.ChannelID, "channel-id", "", "LBRY channel ID to publish to")
|
||||||
|
cmd.Flags().StringVar(&syncContext.VideoID, "video-id", "", "ID of video to sync. This will attempt to sync only this one video.")
|
||||||
|
|
||||||
|
syncContext.YouTubeSourceConfig = &YouTubeSourceConfig{}
|
||||||
|
cmd.Flags().StringVar(&syncContext.YouTubeSourceConfig.APIKey, "youtube-api-key", getEnvDefault("YOUTUBE_API_KEY", ""), "YouTube API Key")
|
||||||
|
cmd.Flags().StringVar(&syncContext.YouTubeSourceConfig.ChannelID, "youtube-channel", "", "YouTube Channel ID")
|
||||||
|
rootCmd.AddCommand(cmd)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getEnvDefault(key, defaultValue string) string {
|
||||||
|
if value, ok := os.LookupEnv(key); ok {
|
||||||
|
return value
|
||||||
|
}
|
||||||
|
return defaultValue
|
||||||
}
|
}
|
||||||
|
|
||||||
func localCmd(cmd *cobra.Command, args []string) {
|
func localCmd(cmd *cobra.Command, args []string) {
|
||||||
fmt.Println("local")
|
err := syncContext.Validate()
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
syncDB, err := NewSyncDb(syncContext.SyncDbPath)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error creating sync DB: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer syncDB.Close()
|
||||||
|
|
||||||
|
var publisher VideoPublisher
|
||||||
|
publisher, err = NewLocalSDKPublisher(syncContext.LbrynetAddr, syncContext.ChannelID, syncContext.PublishBid)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error setting up publisher: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var videoSource VideoSource
|
||||||
|
if syncContext.YouTubeSourceConfig != nil {
|
||||||
|
videoSource, err = NewYtdlVideoSource(syncContext.TempDir, syncContext.YouTubeSourceConfig, syncDB)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error setting up video source: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
latestPublishedReleaseTime := int64(0)
|
||||||
|
latestKnownReleaseTime := int64(0)
|
||||||
|
if syncContext.ForceChannelScan {
|
||||||
|
log.Infof("Channel scan is being forced.")
|
||||||
|
} else {
|
||||||
|
dbSummary, err := syncDB.GetSummary()
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error getting sync DB summary for update scan: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
latestPublishedReleaseTime = dbSummary.LatestPublished
|
||||||
|
latestKnownReleaseTime = dbSummary.LatestKnown
|
||||||
|
}
|
||||||
|
log.Debugf("Latest known release time: %d", latestKnownReleaseTime)
|
||||||
|
for result := range videoSource.Scan(latestKnownReleaseTime) {
|
||||||
|
if result.Error != nil {
|
||||||
|
log.Errorf("Error while discovering new videos from source: %v", result.Error)
|
||||||
|
} else {
|
||||||
|
syncDB.SaveKnownVideo(*result.Video)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.Debugf("Latest published release time: %d", latestPublishedReleaseTime)
|
||||||
|
for result := range publisher.PublishedVideoIterator(latestPublishedReleaseTime) {
|
||||||
|
if result.Error != nil {
|
||||||
|
log.Errorf("Error while discovering published videos: %v", result.Error)
|
||||||
|
} else {
|
||||||
|
syncDB.SavePublishedVideo(*result.Video)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var videoIDs []string
|
||||||
|
if syncContext.VideoID == "" {
|
||||||
|
videoIDs, err = syncDB.GetUnpublishedIDs(videoSource.SourceName())
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error getting unpublished videos from sync DB: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
videoIDs = []string{ syncContext.VideoID }
|
||||||
|
}
|
||||||
|
log.Debugf("Syncing videos: %v", videoIDs)
|
||||||
|
for _, videoID := range videoIDs {
|
||||||
|
err = syncVideo(syncContext, syncDB, videoSource, publisher, videoID)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error syncing %s: %v", videoID, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.Info("Done")
|
||||||
|
}
|
||||||
|
|
||||||
|
func cacheVideo(syncContext SyncContext, syncDB *SyncDb, videoSource VideoSource, videoID string) (*PublishableVideo, error) {
|
||||||
|
log.Debugf("Ensuring video %s:%s is cached", videoSource.SourceName(), videoID)
|
||||||
|
|
||||||
|
videoRecord, err := syncDB.GetVideoRecord(videoSource.SourceName(), videoID, true, true)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error checking if video is already cached: %v", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if videoRecord != nil && videoRecord.FullLocalPath.Valid {
|
||||||
|
log.Debugf("%s:%s is already cached.", videoSource.SourceName(), videoID)
|
||||||
|
video := videoRecord.ToPublishableVideo()
|
||||||
|
if video == nil {
|
||||||
|
log.Warnf("%s:%s appears to be cached locally, but has missing data. Caching again.")
|
||||||
|
}
|
||||||
|
return video, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debugf("%s:%s is not cached locally. Caching now.", videoSource.SourceName(), videoID)
|
||||||
|
sourceVideo, err := videoSource.GetVideo(videoID)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error getting source video: %v", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
processedVideo, err := processVideoForPublishing(*sourceVideo, syncContext.ChannelID)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error processing source video for publishing: %v", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = syncDB.SavePublishableVideo(*processedVideo)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error saving video data: %v", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return processedVideo, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func syncVideo(syncContext SyncContext, syncDB *SyncDb, videoSource VideoSource, publisher VideoPublisher, videoID string) error {
|
||||||
|
log.Debugf("Running sync for video %s:%s", videoSource.SourceName(), videoID)
|
||||||
|
|
||||||
|
isSynced, claimID, err := syncDB.IsVideoPublished(videoSource.SourceName(), videoID)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error checking if video is already synced: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if isSynced {
|
||||||
|
log.Infof("Video %s:%s is already published as %s.", videoSource.SourceName(), videoID, claimID)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
processedVideo, err := cacheVideo(syncContext, syncDB, videoSource, videoID)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error ensuring video is cached prior to publication: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if syncContext.DryRun {
|
||||||
|
log.Infoln("This is a dry run. Nothing will be published.")
|
||||||
|
log.Infof("The local file %s would be published to channel ID %s as %s.", processedVideo.FullLocalPath, syncContext.ChannelID, processedVideo.ClaimName)
|
||||||
|
log.Debugf("Object to be published: %v", processedVideo)
|
||||||
|
|
||||||
|
} else {
|
||||||
|
claimID, doneReflectingCh, err := publisher.Publish(*processedVideo, syncContext.ReflectStreams)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error publishing video: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = syncDB.SavePublishedVideo((*processedVideo).ToPublished(claimID))
|
||||||
|
if err != nil {
|
||||||
|
// Sync DB is corrupted after getting here
|
||||||
|
// and will allow double publication.
|
||||||
|
log.Errorf("Error saving video publication to sync DB: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if syncContext.ReflectStreams {
|
||||||
|
err = <-doneReflectingCh
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error while wating for stream to reflect: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.Debugln("Not waiting for stream to reflect.")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !syncContext.KeepCache {
|
||||||
|
log.Infof("Deleting local files.")
|
||||||
|
err = syncDB.MarkVideoUncached(videoSource.SourceName(), videoID)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error marking video %s:%s as uncached in syncDB", videoSource.SourceName(), videoID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = videoSource.DeleteLocalCache(videoID)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error deleting local files for video %s: %v", videoID, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type SourceVideo struct {
|
||||||
|
ID string
|
||||||
|
Source string
|
||||||
|
Title *string
|
||||||
|
Description *string
|
||||||
|
SourceURL string
|
||||||
|
Languages []string
|
||||||
|
Tags []string
|
||||||
|
ReleaseTime *int64
|
||||||
|
ThumbnailURL *string
|
||||||
|
FullLocalPath *string
|
||||||
|
}
|
||||||
|
|
||||||
|
type PublishableVideo struct {
|
||||||
|
ID string
|
||||||
|
Source string
|
||||||
|
ClaimName string
|
||||||
|
Title string
|
||||||
|
Description string
|
||||||
|
SourceURL string
|
||||||
|
Languages []string
|
||||||
|
Tags []string
|
||||||
|
ReleaseTime int64
|
||||||
|
ThumbnailURL string
|
||||||
|
FullLocalPath string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v PublishableVideo) ToPublished(claimID string) PublishedVideo {
|
||||||
|
return PublishedVideo {
|
||||||
|
ClaimID: claimID,
|
||||||
|
NativeID: v.ID,
|
||||||
|
Source: v.Source,
|
||||||
|
ClaimName: v.ClaimName,
|
||||||
|
Title: v.Title,
|
||||||
|
Description: v.Description,
|
||||||
|
SourceURL: v.SourceURL,
|
||||||
|
Languages: v.Languages,
|
||||||
|
Tags: v.Tags,
|
||||||
|
ReleaseTime: v.ReleaseTime,
|
||||||
|
ThumbnailURL: v.ThumbnailURL,
|
||||||
|
FullLocalPath: v.FullLocalPath,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type PublishedVideo struct {
|
||||||
|
ClaimID string
|
||||||
|
NativeID string
|
||||||
|
Source string
|
||||||
|
ClaimName string
|
||||||
|
Title string
|
||||||
|
Description string
|
||||||
|
SourceURL string
|
||||||
|
Languages []string
|
||||||
|
Tags []string
|
||||||
|
ReleaseTime int64
|
||||||
|
ThumbnailURL string
|
||||||
|
FullLocalPath string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v PublishedVideo) ToPublishable() PublishableVideo {
|
||||||
|
return PublishableVideo {
|
||||||
|
ID: v.NativeID,
|
||||||
|
Source: v.Source,
|
||||||
|
ClaimName: v.ClaimName,
|
||||||
|
Title: v.Title,
|
||||||
|
Description: v.Description,
|
||||||
|
SourceURL: v.SourceURL,
|
||||||
|
Languages: v.Languages,
|
||||||
|
Tags: v.Tags,
|
||||||
|
ReleaseTime: v.ReleaseTime,
|
||||||
|
ThumbnailURL: v.ThumbnailURL,
|
||||||
|
FullLocalPath: v.FullLocalPath,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func processVideoForPublishing(source SourceVideo, channelID string) (*PublishableVideo, error) {
|
||||||
|
if source.FullLocalPath == nil {
|
||||||
|
return nil, errors.New("Video is not cached locally")
|
||||||
|
}
|
||||||
|
|
||||||
|
tags, err := tags_manager.SanitizeTags(source.Tags, channelID)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error sanitizing tags: %v", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
descriptionSample := ""
|
||||||
|
if source.Description != nil {
|
||||||
|
urlsRegex := regexp.MustCompile(`(?m) ?(f|ht)(tp)(s?)(://)(.*)[.|/](.*)`)
|
||||||
|
descriptionSample = urlsRegex.ReplaceAllString(*source.Description, "")
|
||||||
|
}
|
||||||
|
info := whatlanggo.Detect(descriptionSample)
|
||||||
|
|
||||||
|
title := ""
|
||||||
|
if source.Title != nil {
|
||||||
|
title = *source.Title
|
||||||
|
}
|
||||||
|
info2 := whatlanggo.Detect(title)
|
||||||
|
var languages []string = nil
|
||||||
|
if info.IsReliable() && info.Lang.Iso6391() != "" {
|
||||||
|
language := info.Lang.Iso6391()
|
||||||
|
languages = []string{language}
|
||||||
|
} else if info2.IsReliable() && info2.Lang.Iso6391() != "" {
|
||||||
|
language := info2.Lang.Iso6391()
|
||||||
|
languages = []string{language}
|
||||||
|
}
|
||||||
|
|
||||||
|
claimName := namer.NewNamer().GetNextName(title)
|
||||||
|
|
||||||
|
thumbnailURL := source.ThumbnailURL
|
||||||
|
if thumbnailURL == nil {
|
||||||
|
thumbnailURL = util.PtrToString("")
|
||||||
|
}
|
||||||
|
|
||||||
|
releaseTime := source.ReleaseTime
|
||||||
|
if releaseTime == nil {
|
||||||
|
releaseTime = util.PtrToInt64(time.Now().Unix())
|
||||||
|
}
|
||||||
|
|
||||||
|
processed := PublishableVideo {
|
||||||
|
ID: source.ID,
|
||||||
|
Source: source.Source,
|
||||||
|
ClaimName: claimName,
|
||||||
|
Title: title,
|
||||||
|
Description: getAbbrevDescription(source),
|
||||||
|
Languages: languages,
|
||||||
|
Tags: tags,
|
||||||
|
ReleaseTime: *releaseTime,
|
||||||
|
ThumbnailURL: *thumbnailURL,
|
||||||
|
FullLocalPath: *source.FullLocalPath,
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debugf("Video prepared for publication: %v", processed)
|
||||||
|
|
||||||
|
return &processed, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getAbbrevDescription(v SourceVideo) string {
|
||||||
|
if v.Description == nil {
|
||||||
|
return v.SourceURL
|
||||||
|
}
|
||||||
|
|
||||||
|
additionalDescription := "\n...\n" + v.SourceURL
|
||||||
|
maxLength := 2800 - len(additionalDescription)
|
||||||
|
|
||||||
|
description := strings.TrimSpace(*v.Description)
|
||||||
|
if len(description) > maxLength {
|
||||||
|
description = description[:maxLength]
|
||||||
|
}
|
||||||
|
return description + additionalDescription
|
||||||
|
}
|
||||||
|
|
||||||
|
type VideoSource interface {
|
||||||
|
SourceName() string
|
||||||
|
GetVideo(id string) (*SourceVideo, error)
|
||||||
|
DeleteLocalCache(id string) error
|
||||||
|
Scan(sinceTimestamp int64) <-chan SourceScanIteratorResult
|
||||||
|
}
|
||||||
|
|
||||||
|
type SourceScanIteratorResult struct {
|
||||||
|
Video *SourceVideo
|
||||||
|
Error error
|
||||||
|
}
|
||||||
|
|
||||||
|
type VideoPublisher interface {
|
||||||
|
Publish(video PublishableVideo, reflectStream bool) (string, <-chan error, error)
|
||||||
|
PublishedVideoIterator(sinceTimestamp int64) <-chan PublishedVideoIteratorResult
|
||||||
|
}
|
||||||
|
|
||||||
|
type PublishedVideoIteratorResult struct {
|
||||||
|
Video *PublishedVideo
|
||||||
|
Error error
|
||||||
}
|
}
|
||||||
|
|
190
local/localSDKPublisher.go
Normal file
190
local/localSDKPublisher.go
Normal file
|
@ -0,0 +1,190 @@
|
||||||
|
package local
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"sort"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/lbryio/lbry.go/v2/extras/jsonrpc"
|
||||||
|
"github.com/lbryio/lbry.go/v2/extras/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
type LocalSDKPublisher struct {
|
||||||
|
channelID string
|
||||||
|
publishBid float64
|
||||||
|
lbrynet *jsonrpc.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewLocalSDKPublisher(sdkAddr, channelID string, publishBid float64) (*LocalSDKPublisher, error) {
|
||||||
|
lbrynet := jsonrpc.NewClient(sdkAddr)
|
||||||
|
lbrynet.SetRPCTimeout(5 * time.Minute)
|
||||||
|
|
||||||
|
status, err := lbrynet.Status()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !status.IsRunning {
|
||||||
|
return nil, errors.New("SDK is not running")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should check to see if the SDK owns the channel
|
||||||
|
|
||||||
|
// Should check to see if wallet is unlocked
|
||||||
|
// but jsonrpc.Client doesn't have WalletStatus method
|
||||||
|
// so skip for now
|
||||||
|
|
||||||
|
// Should check to see if streams are configured to be reflected and warn if not
|
||||||
|
// but jsonrpc.Client doesn't have SettingsGet method to see if streams are reflected
|
||||||
|
// so use File.UploadingToReflector as a proxy for now
|
||||||
|
|
||||||
|
publisher := LocalSDKPublisher {
|
||||||
|
channelID: channelID,
|
||||||
|
publishBid: publishBid,
|
||||||
|
lbrynet: lbrynet,
|
||||||
|
}
|
||||||
|
return &publisher, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *LocalSDKPublisher) Publish(video PublishableVideo, reflectStream bool) (string, <-chan error, error) {
|
||||||
|
streamCreateOptions := jsonrpc.StreamCreateOptions {
|
||||||
|
ClaimCreateOptions: jsonrpc.ClaimCreateOptions {
|
||||||
|
Title: &video.Title,
|
||||||
|
Description: &video.Description,
|
||||||
|
Languages: video.Languages,
|
||||||
|
ThumbnailURL: &video.ThumbnailURL,
|
||||||
|
Tags: video.Tags,
|
||||||
|
},
|
||||||
|
ReleaseTime: &video.ReleaseTime,
|
||||||
|
ChannelID: &p.channelID,
|
||||||
|
License: util.PtrToString("Copyrighted (contact publisher)"),
|
||||||
|
}
|
||||||
|
|
||||||
|
txSummary, err := p.lbrynet.StreamCreate(video.ClaimName, video.FullLocalPath, p.publishBid, streamCreateOptions)
|
||||||
|
if err != nil {
|
||||||
|
return "", nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var claimID *string
|
||||||
|
for _, output := range txSummary.Outputs {
|
||||||
|
if output.Type == "claim" {
|
||||||
|
claimID = &output.ClaimID
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if claimID == nil {
|
||||||
|
return "", nil, errors.New("Publish transaction did not have a claim output.")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !reflectStream {
|
||||||
|
return "", nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
done := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
fileListResponse, fileIndex, err := findFileByTxid(p.lbrynet, txSummary.Txid)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error finding file by txid: %v", err)
|
||||||
|
done <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if fileListResponse == nil {
|
||||||
|
log.Errorf("Could not find file in list with correct txid")
|
||||||
|
done <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
fileStatus := fileListResponse.Items[fileIndex]
|
||||||
|
if fileStatus.IsFullyReflected {
|
||||||
|
log.Info("Stream is fully reflected")
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if !fileStatus.UploadingToReflector {
|
||||||
|
log.Error("Stream is not being uploaded to a reflector. Check your lbrynet settings if this is a mistake.")
|
||||||
|
done <- errors.New("Stream is not being reflected (check lbrynet settings).")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.Infof("Stream reflector progress: %d%%", fileStatus.ReflectorProgress)
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
}
|
||||||
|
done <- nil
|
||||||
|
}()
|
||||||
|
|
||||||
|
return *claimID, done, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *LocalSDKPublisher) PublishedVideoIterator(sinceTimestamp int64) <-chan PublishedVideoIteratorResult {
|
||||||
|
videoCh := make(chan PublishedVideoIteratorResult, 10)
|
||||||
|
go func() {
|
||||||
|
defer close(videoCh)
|
||||||
|
for page := uint64(0); ; page++ {
|
||||||
|
streams, err := p.lbrynet.StreamList(nil, page, 100)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error listing streams (page %d): %v", page, err)
|
||||||
|
|
||||||
|
errResult := PublishedVideoIteratorResult {
|
||||||
|
Error: err,
|
||||||
|
}
|
||||||
|
videoCh <- errResult
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(streams.Items) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, stream := range streams.Items {
|
||||||
|
if stream.ChannelID != p.channelID || stream.Value.GetStream().ReleaseTime < sinceTimestamp {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
languages := []string{}
|
||||||
|
for _, language := range stream.Value.Languages {
|
||||||
|
languages = append(languages, language.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
video := PublishedVideo {
|
||||||
|
ClaimID: stream.ClaimID,
|
||||||
|
NativeID: "",
|
||||||
|
Source: "",
|
||||||
|
ClaimName: stream.Name,
|
||||||
|
Title: stream.Value.Title,
|
||||||
|
Description: stream.Value.Description,
|
||||||
|
Languages: languages,
|
||||||
|
Tags: stream.Value.Tags,
|
||||||
|
ReleaseTime: stream.Value.GetStream().ReleaseTime,
|
||||||
|
ThumbnailURL: stream.Value.Thumbnail.Url,
|
||||||
|
FullLocalPath: "",
|
||||||
|
}
|
||||||
|
|
||||||
|
videoResult := PublishedVideoIteratorResult {
|
||||||
|
Video: &video,
|
||||||
|
}
|
||||||
|
videoCh <- videoResult
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return videoCh
|
||||||
|
}
|
||||||
|
|
||||||
|
// if jsonrpc.Client.FileList is extended to match the actual jsonrpc schema, this can be removed
|
||||||
|
func findFileByTxid(client *jsonrpc.Client, txid string) (*jsonrpc.FileListResponse, int, error) {
|
||||||
|
response, err := client.FileList(0, 20)
|
||||||
|
for {
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error getting file list page: %v", err)
|
||||||
|
return nil, 0, err
|
||||||
|
}
|
||||||
|
index := sort.Search(len(response.Items), func (i int) bool { return response.Items[i].Txid == txid })
|
||||||
|
if index < len(response.Items) {
|
||||||
|
return response, index, nil
|
||||||
|
}
|
||||||
|
if response.Page >= response.TotalPages {
|
||||||
|
return nil, 0, nil
|
||||||
|
}
|
||||||
|
response, err = client.FileList(response.Page + 1, 20)
|
||||||
|
}
|
||||||
|
}
|
|
@ -5,6 +5,7 @@
|
||||||
- LBRY SDK (what do we actually need this for?)
|
- LBRY SDK (what do we actually need this for?)
|
||||||
- youtube-dl
|
- youtube-dl
|
||||||
- enough space to cache stuff
|
- enough space to cache stuff
|
||||||
|
- YouTube data API key
|
||||||
|
|
||||||
|
|
||||||
## Process
|
## Process
|
||||||
|
@ -15,6 +16,21 @@
|
||||||
- or easier, just error if no channel
|
- or easier, just error if no channel
|
||||||
- enough lbc in wallet?
|
- enough lbc in wallet?
|
||||||
|
|
||||||
|
### Getting a YouTube API key
|
||||||
|
|
||||||
|
To access the YouTube data API, you will first need some kind of google account.
|
||||||
|
|
||||||
|
The API has two methods of authentication, OAuth2 and API keys. This application uses API keys.
|
||||||
|
These API keys are basically like passwords, and so once obtained, they should not be shared.
|
||||||
|
|
||||||
|
The instructions for obtaining an API key are copied below from [here](https://developers.google.com/youtube/registering_an_application):
|
||||||
|
|
||||||
|
|
||||||
|
1. Open the [Credentials page](https://console.developers.google.com/apis/credentials) in the API Console.
|
||||||
|
2. Create an API key in the Console by clicking **Create credentials > API key**. You can restrict the key before using it in production by clicking **Restrict key** and selecting one of the **Restrictions**.
|
||||||
|
|
||||||
|
To keep your API keys secure, follow the [best practices for securely using API keys](https://cloud.google.com/docs/authentication/api-keys).
|
||||||
|
|
||||||
### Options to figure out what's already synced
|
### Options to figure out what's already synced
|
||||||
|
|
||||||
- simplest: assume nothing is synced yet
|
- simplest: assume nothing is synced yet
|
||||||
|
@ -50,4 +66,4 @@
|
||||||
|
|
||||||
### Debugging
|
### Debugging
|
||||||
|
|
||||||
- dry-running the whole thing
|
- dry-running the whole thing
|
||||||
|
|
676
local/syncDb.go
Normal file
676
local/syncDb.go
Normal file
|
@ -0,0 +1,676 @@
|
||||||
|
package local
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
_ "github.com/mattn/go-sqlite3"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SyncDb struct {
|
||||||
|
db *sql.DB
|
||||||
|
}
|
||||||
|
|
||||||
|
type SyncDbSummary struct {
|
||||||
|
Total int
|
||||||
|
CachedUnpublished int
|
||||||
|
UncachedUnpublished int
|
||||||
|
LatestKnown int64
|
||||||
|
LatestPublished int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSyncDb(path string) (*SyncDb, error) {
|
||||||
|
db, err := sql.Open("sqlite3", path)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error opening cache DB at %s: %v", path, err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
cache := SyncDb {
|
||||||
|
db: db,
|
||||||
|
}
|
||||||
|
err = cache.ensureSchema()
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error while ensuring sync DB structure: %v", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &cache, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SyncDb) Close() error {
|
||||||
|
return c.db.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SyncDb) SaveKnownVideo(video SourceVideo) error {
|
||||||
|
if video.ID == "" {
|
||||||
|
log.Warnf("Trying to save a video with no ID: %v", video)
|
||||||
|
}
|
||||||
|
insertSql := `
|
||||||
|
INSERT INTO videos (
|
||||||
|
source,
|
||||||
|
native_id,
|
||||||
|
title,
|
||||||
|
description,
|
||||||
|
source_url,
|
||||||
|
release_time,
|
||||||
|
thumbnail_url,
|
||||||
|
full_local_path
|
||||||
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||||||
|
ON CONFLICT (source, native_id)
|
||||||
|
DO NOTHING;
|
||||||
|
`
|
||||||
|
|
||||||
|
r := SyncRecordFromSourceVideo(video)
|
||||||
|
|
||||||
|
_, err := c.db.Exec(
|
||||||
|
insertSql,
|
||||||
|
r.Source,
|
||||||
|
r.NativeID,
|
||||||
|
r.Title,
|
||||||
|
r.Description,
|
||||||
|
r.SourceURL,
|
||||||
|
r.ReleaseTime,
|
||||||
|
r.ThumbnailURL,
|
||||||
|
r.FullLocalPath,
|
||||||
|
)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SyncDb) SavePublishableVideo(video PublishableVideo) error {
|
||||||
|
upsertSql := `
|
||||||
|
INSERT INTO videos (
|
||||||
|
source,
|
||||||
|
native_id,
|
||||||
|
claim_name,
|
||||||
|
title,
|
||||||
|
description,
|
||||||
|
source_url,
|
||||||
|
release_time,
|
||||||
|
thumbnail_url,
|
||||||
|
full_local_path
|
||||||
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||||
|
ON CONFLICT (source, native_id)
|
||||||
|
DO UPDATE SET
|
||||||
|
claim_name = excluded.claim_name,
|
||||||
|
title = excluded.title,
|
||||||
|
description = excluded.description,
|
||||||
|
source_url = excluded.source_url,
|
||||||
|
release_time = excluded.release_time,
|
||||||
|
thumbnail_url = excluded.thumbnail_url,
|
||||||
|
full_local_path = excluded.full_local_path;
|
||||||
|
`
|
||||||
|
_, err := c.db.Exec(
|
||||||
|
upsertSql,
|
||||||
|
video.Source,
|
||||||
|
video.ID,
|
||||||
|
video.ClaimName,
|
||||||
|
video.Title,
|
||||||
|
video.Description,
|
||||||
|
video.SourceURL,
|
||||||
|
video.ReleaseTime,
|
||||||
|
video.ThumbnailURL,
|
||||||
|
video.FullLocalPath,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
err = c.upsertTags(video.Source, video.ID, video.Tags)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = c.upsertLanguages(video.Source, video.ID, video.Languages)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SyncDb) SavePublishedVideo(video PublishedVideo) error {
|
||||||
|
upsertSql := `
|
||||||
|
INSERT INTO videos (
|
||||||
|
source,
|
||||||
|
native_id,
|
||||||
|
claim_id,
|
||||||
|
claim_name,
|
||||||
|
title,
|
||||||
|
description,
|
||||||
|
source_url,
|
||||||
|
release_time,
|
||||||
|
thumbnail_url,
|
||||||
|
full_local_path
|
||||||
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||||
|
ON CONFLICT (source, native_id)
|
||||||
|
DO UPDATE SET
|
||||||
|
claim_id = excluded.claim_id,
|
||||||
|
claim_name = excluded.claim_name,
|
||||||
|
title = excluded.title,
|
||||||
|
description = excluded.description,
|
||||||
|
source_url = excluded.source_url,
|
||||||
|
release_time = excluded.release_time,
|
||||||
|
thumbnail_url = excluded.thumbnail_url,
|
||||||
|
full_local_path = excluded.full_local_path;
|
||||||
|
`
|
||||||
|
_, err := c.db.Exec(
|
||||||
|
upsertSql,
|
||||||
|
video.Source,
|
||||||
|
video.NativeID,
|
||||||
|
video.ClaimID,
|
||||||
|
video.ClaimName,
|
||||||
|
video.Title,
|
||||||
|
video.Description,
|
||||||
|
video.SourceURL,
|
||||||
|
video.ReleaseTime,
|
||||||
|
video.ThumbnailURL,
|
||||||
|
video.FullLocalPath,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
err = c.upsertTags(video.Source, video.NativeID, video.Tags)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = c.upsertLanguages(video.Source, video.NativeID, video.Languages)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SyncDb) MarkVideoUncached(source, id string) error {
|
||||||
|
updateSql := `
|
||||||
|
UPDATE videos
|
||||||
|
SET full_local_path = NULL
|
||||||
|
WHERE source = ? AND native_id = ?;
|
||||||
|
`
|
||||||
|
_, err := c.db.Exec(
|
||||||
|
updateSql,
|
||||||
|
source,
|
||||||
|
id,
|
||||||
|
)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SyncDb) _SaveVideoData(video SourceVideo) error {
|
||||||
|
upsertSql := `
|
||||||
|
INSERT INTO videos (
|
||||||
|
source,
|
||||||
|
native_id,
|
||||||
|
title,
|
||||||
|
description,
|
||||||
|
source_url,
|
||||||
|
release_time,
|
||||||
|
thumbnail_url,
|
||||||
|
full_local_path
|
||||||
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||||||
|
ON CONFLICT (source, native_id)
|
||||||
|
DO UPDATE SET
|
||||||
|
title = excluded.title,
|
||||||
|
description = excluded.description,
|
||||||
|
source_url = excluded.source_url,
|
||||||
|
release_time = excluded.release_time,
|
||||||
|
thumbnail_url = excluded.thumbnail_url,
|
||||||
|
full_local_path = excluded.full_local_path;
|
||||||
|
`
|
||||||
|
_, err := c.db.Exec(
|
||||||
|
upsertSql,
|
||||||
|
video.Source,
|
||||||
|
video.ID,
|
||||||
|
video.Title,
|
||||||
|
video.Description,
|
||||||
|
video.SourceURL,
|
||||||
|
video.ReleaseTime,
|
||||||
|
video.ThumbnailURL,
|
||||||
|
video.FullLocalPath,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = c.upsertTags(video.Source, video.ID, video.Tags)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = c.upsertLanguages(video.Source, video.ID, video.Languages)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SyncDb) SaveVideoPublication(video PublishableVideo, claimID string) error {
|
||||||
|
upsertSql := `
|
||||||
|
INSERT INTO videos (
|
||||||
|
source,
|
||||||
|
native_id,
|
||||||
|
title,
|
||||||
|
description,
|
||||||
|
source_url,
|
||||||
|
release_time,
|
||||||
|
thumbnail_url,
|
||||||
|
full_local_path,
|
||||||
|
claim_id
|
||||||
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||||
|
ON CONFLICT (source, native_id)
|
||||||
|
DO UPDATE SET
|
||||||
|
title = excluded.title,
|
||||||
|
description = excluded.description,
|
||||||
|
source_url = excluded.source_url,
|
||||||
|
release_time = excluded.release_time,
|
||||||
|
thumbnail_url = excluded.thumbnail_url,
|
||||||
|
full_local_path = excluded.full_local_path,
|
||||||
|
claim_id = excluded.claim_id;
|
||||||
|
`
|
||||||
|
_, err := c.db.Exec(
|
||||||
|
upsertSql,
|
||||||
|
video.Source,
|
||||||
|
video.ID,
|
||||||
|
video.Title,
|
||||||
|
video.Description,
|
||||||
|
video.SourceURL,
|
||||||
|
video.ReleaseTime,
|
||||||
|
video.ThumbnailURL,
|
||||||
|
video.FullLocalPath,
|
||||||
|
claimID,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = c.upsertTags(video.Source, video.ID, video.Tags)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = c.upsertLanguages(video.Source, video.ID, video.Languages)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SyncDb) IsVideoPublished(source, id string) (bool, string, error) {
|
||||||
|
selectSql := `
|
||||||
|
SELECT
|
||||||
|
claim_id
|
||||||
|
FROM videos
|
||||||
|
WHERE source = ? AND native_id = ?
|
||||||
|
`
|
||||||
|
row := c.db.QueryRow(selectSql, source, id)
|
||||||
|
|
||||||
|
var claimID sql.NullString
|
||||||
|
err := row.Scan(&claimID)
|
||||||
|
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
return false, "", nil
|
||||||
|
} else if err != nil {
|
||||||
|
log.Errorf("Error querying video publication for %s:%s from sync DB: %v", source, id, err)
|
||||||
|
return false, "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
if claimID.Valid {
|
||||||
|
return true, claimID.String, nil
|
||||||
|
} else {
|
||||||
|
return false, "", nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SyncDb) IsVideoCached(source, id string) (bool, string, error) {
|
||||||
|
selectSql := `
|
||||||
|
SELECT
|
||||||
|
full_local_path
|
||||||
|
FROM videos
|
||||||
|
WHERE source = ? AND native_id = ?
|
||||||
|
`
|
||||||
|
row := c.db.QueryRow(selectSql, source, id)
|
||||||
|
|
||||||
|
var localPath sql.NullString
|
||||||
|
err := row.Scan(&localPath)
|
||||||
|
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
return false, "", nil
|
||||||
|
} else if err != nil {
|
||||||
|
log.Errorf("Error querying video cache status for %s:%s from sync DB: %v", source, id, err)
|
||||||
|
return false, "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
if localPath.Valid {
|
||||||
|
return true, localPath.String, nil
|
||||||
|
} else {
|
||||||
|
return false, "", nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SyncDb) GetVideoRecord(source, id string, includeTags, includeLanguages bool) (*SyncRecord, error) {
|
||||||
|
selectSql := `
|
||||||
|
SELECT
|
||||||
|
native_id,
|
||||||
|
title,
|
||||||
|
description,
|
||||||
|
source_url,
|
||||||
|
release_time,
|
||||||
|
thumbnail_url,
|
||||||
|
full_local_path,
|
||||||
|
claim_id
|
||||||
|
FROM videos
|
||||||
|
WHERE source = ? AND native_id = ?
|
||||||
|
`
|
||||||
|
row := c.db.QueryRow(selectSql, source, id)
|
||||||
|
|
||||||
|
var record SyncRecord
|
||||||
|
err := row.Scan(
|
||||||
|
&record.NativeID,
|
||||||
|
&record.Title,
|
||||||
|
&record.Description,
|
||||||
|
&record.SourceURL,
|
||||||
|
&record.ReleaseTime,
|
||||||
|
&record.ThumbnailURL,
|
||||||
|
&record.FullLocalPath,
|
||||||
|
&record.ClaimID,
|
||||||
|
)
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
log.Debugf("Data for %s:%s is not in the sync DB", source, id)
|
||||||
|
return nil, nil
|
||||||
|
} else if err != nil {
|
||||||
|
log.Errorf("Error querying video data for %s:%s from sync DB: %v", source, id, err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if includeTags {
|
||||||
|
tags, err := c.getTags(source, id)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
record.Tags = &tags
|
||||||
|
}
|
||||||
|
|
||||||
|
if includeLanguages {
|
||||||
|
languages, err := c.getLanguages(source, id)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
record.Languages = &languages
|
||||||
|
}
|
||||||
|
|
||||||
|
return &record, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SyncDb) GetUnpublishedIDs(source string) ([]string, error) {
|
||||||
|
selectSql := `
|
||||||
|
SELECT
|
||||||
|
native_id
|
||||||
|
FROM videos
|
||||||
|
WHERE source = ? AND claim_id IS NULL
|
||||||
|
`
|
||||||
|
ids := []string{}
|
||||||
|
|
||||||
|
rows, err := c.db.Query(selectSql, source)
|
||||||
|
if err != nil {
|
||||||
|
return ids, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
for rows.Next() {
|
||||||
|
var id string
|
||||||
|
err = rows.Scan(&id)
|
||||||
|
if err != nil {
|
||||||
|
return ids, err
|
||||||
|
}
|
||||||
|
ids = append(ids, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ids, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SyncDb) GetSummary() (*SyncDbSummary, error) {
|
||||||
|
selectSql := `
|
||||||
|
SELECT
|
||||||
|
COUNT() AS total,
|
||||||
|
COUNT(v_unpub.full_local_path) AS cached_unpublished,
|
||||||
|
COUNT(v_all.claim_id) - COUNT(v_unpub.full_local_path) AS uncached_unpublished,
|
||||||
|
MAX(v_all.release_time) AS latest_known,
|
||||||
|
MAX(v_pub.release_time) AS latest_published
|
||||||
|
FROM videos v_all
|
||||||
|
LEFT JOIN videos v_pub ON v_all.source = v_pub.source AND v_all.native_id = v_pub.native_id AND v_pub.claim_id IS NOT NULL
|
||||||
|
LEFT JOIN videos v_unpub ON v_all.source = v_unpub.source AND v_all.native_id = v_unpub.native_id AND v_unpub.claim_id IS NULL
|
||||||
|
`
|
||||||
|
row := c.db.QueryRow(selectSql)
|
||||||
|
|
||||||
|
var summary SyncDbSummary
|
||||||
|
var latestKnown, latestPublished sql.NullInt64
|
||||||
|
err := row.Scan(
|
||||||
|
&summary.Total,
|
||||||
|
&summary.CachedUnpublished,
|
||||||
|
&summary.UncachedUnpublished,
|
||||||
|
&latestKnown,
|
||||||
|
&latestPublished,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error querying sync DB summary: %v", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if latestKnown.Valid {
|
||||||
|
summary.LatestKnown = latestKnown.Int64
|
||||||
|
}
|
||||||
|
if latestPublished.Valid {
|
||||||
|
summary.LatestPublished = latestPublished.Int64
|
||||||
|
}
|
||||||
|
|
||||||
|
return &summary, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SyncDb) ensureSchema() error {
|
||||||
|
createSql := `
|
||||||
|
CREATE TABLE IF NOT EXISTS videos (
|
||||||
|
source TEXT,
|
||||||
|
native_id TEXT,
|
||||||
|
title TEXT,
|
||||||
|
description TEXT,
|
||||||
|
source_url TEXT,
|
||||||
|
release_time INT,
|
||||||
|
thumbnail_url TEXT,
|
||||||
|
full_local_path TEXT,
|
||||||
|
claim_name TEXT,
|
||||||
|
claim_id TEXT,
|
||||||
|
PRIMARY KEY (source, native_id)
|
||||||
|
);
|
||||||
|
CREATE TABLE IF NOT EXISTS video_tags (
|
||||||
|
source TEXT NOT NULL,
|
||||||
|
native_id TEXT NOT NULL,
|
||||||
|
tag TEXT NOT NULL,
|
||||||
|
UNIQUE (source, native_id, tag)
|
||||||
|
);
|
||||||
|
CREATE TABLE IF NOT EXISTS video_languages (
|
||||||
|
source TEXT NOT NULL,
|
||||||
|
native_id TEXT NOT NULL,
|
||||||
|
language TEXT NOT NULL,
|
||||||
|
UNIQUE (source, native_id, language)
|
||||||
|
);
|
||||||
|
`
|
||||||
|
_, err := c.db.Exec(createSql)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SyncDb) upsertTags(source, id string, tags []string) error {
|
||||||
|
upsertSql := `
|
||||||
|
INSERT INTO video_tags (
|
||||||
|
source,
|
||||||
|
native_id,
|
||||||
|
tag
|
||||||
|
) VALUES (?, ?, ?)
|
||||||
|
ON CONFLICT (source, native_id, tag)
|
||||||
|
DO NOTHING;
|
||||||
|
`
|
||||||
|
|
||||||
|
for _, tag := range tags {
|
||||||
|
_, err := c.db.Exec(
|
||||||
|
upsertSql,
|
||||||
|
source,
|
||||||
|
id,
|
||||||
|
tag,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error inserting tag %s into sync DB for %s:%s: %v", tag, source, id, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SyncDb) getTags(source, id string) ([]string, error) {
|
||||||
|
selectSql := `
|
||||||
|
SELECT tag
|
||||||
|
FROM video_tags
|
||||||
|
WHERE source = ? AND native_id = ?;
|
||||||
|
`
|
||||||
|
|
||||||
|
rows, err := c.db.Query(selectSql, source, id)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error getting tags from sync DB for %s:%s: %v", source, id, err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var tags []string
|
||||||
|
for rows.Next() {
|
||||||
|
var tag string
|
||||||
|
err = rows.Scan(&tag)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Error deserializing tag from sync DB for %s:%s: %v", source, id, err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
tags = append(tags, tag)
|
||||||
|
}
|
||||||
|
|
||||||
|
return tags, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SyncDb) upsertLanguages(source, id string, languages []string) error {
|
||||||
|
upsertSql := `
|
||||||
|
INSERT INTO video_languages (
|
||||||
|
source,
|
||||||
|
native_id,
|
||||||
|
language
|
||||||
|
) VALUES (?, ?, ?)
|
||||||
|
ON CONFLICT (source, native_id, language)
|
||||||
|
DO NOTHING;
|
||||||
|
`
|
||||||
|
|
||||||
|
for _, language := range languages {
|
||||||
|
_, err := c.db.Exec(
|
||||||
|
upsertSql,
|
||||||
|
source,
|
||||||
|
id,
|
||||||
|
language,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error inserting language %s into sync DB for %s:%s: %v", language, source, id, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SyncDb) getLanguages(source, id string) ([]string, error) {
|
||||||
|
selectSql := `
|
||||||
|
SELECT language
|
||||||
|
FROM video_languages
|
||||||
|
WHERE source = ? AND native_id = ?;
|
||||||
|
`
|
||||||
|
|
||||||
|
rows, err := c.db.Query(selectSql, source, id)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error getting languages from sync DB for %s:%s: %v", source, id, err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var languages []string
|
||||||
|
for rows.Next() {
|
||||||
|
var language string
|
||||||
|
err = rows.Scan(&language)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Error deserializing language from sync DB for %s:%s: %v", source, id, err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
languages = append(languages, language)
|
||||||
|
}
|
||||||
|
|
||||||
|
return languages, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type SyncRecord struct {
|
||||||
|
Source string
|
||||||
|
NativeID string
|
||||||
|
Title sql.NullString
|
||||||
|
Description sql.NullString
|
||||||
|
SourceURL sql.NullString
|
||||||
|
ReleaseTime sql.NullInt64
|
||||||
|
ThumbnailURL sql.NullString
|
||||||
|
FullLocalPath sql.NullString
|
||||||
|
ClaimID sql.NullString
|
||||||
|
Tags *[]string
|
||||||
|
Languages *[]string
|
||||||
|
}
|
||||||
|
|
||||||
|
func SyncRecordFromSourceVideo(v SourceVideo) SyncRecord {
|
||||||
|
r := SyncRecord {
|
||||||
|
Source: v.Source,
|
||||||
|
NativeID: v.ID,
|
||||||
|
SourceURL: sql.NullString { String: v.SourceURL, Valid: true },
|
||||||
|
}
|
||||||
|
|
||||||
|
if v.Title != nil {
|
||||||
|
r.Title = sql.NullString { String: *v.Title, Valid: true }
|
||||||
|
}
|
||||||
|
|
||||||
|
if v.Description != nil {
|
||||||
|
r.Description = sql.NullString { String: *v.Description, Valid: true }
|
||||||
|
}
|
||||||
|
|
||||||
|
if v.ThumbnailURL != nil {
|
||||||
|
r.ThumbnailURL = sql.NullString { String: *v.ThumbnailURL, Valid: true }
|
||||||
|
}
|
||||||
|
|
||||||
|
if v.FullLocalPath != nil {
|
||||||
|
r.FullLocalPath = sql.NullString { String: *v.FullLocalPath, Valid: true }
|
||||||
|
}
|
||||||
|
|
||||||
|
if v.ReleaseTime != nil {
|
||||||
|
r.ReleaseTime = sql.NullInt64 { Int64: *v.ReleaseTime, Valid: true }
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(v.Tags) > 0 {
|
||||||
|
r.Tags = &v.Tags
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(v.Languages) > 0 {
|
||||||
|
r.Languages = &v.Languages
|
||||||
|
}
|
||||||
|
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *SyncRecord) ToPublishableVideo() *PublishableVideo {
|
||||||
|
if !(r.Title.Valid &&
|
||||||
|
r.Description.Valid &&
|
||||||
|
r.SourceURL.Valid &&
|
||||||
|
r.ReleaseTime.Valid &&
|
||||||
|
r.ThumbnailURL.Valid &&
|
||||||
|
r.FullLocalPath.Valid &&
|
||||||
|
r.Tags != nil &&
|
||||||
|
r.Languages != nil) {
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
video := PublishableVideo {
|
||||||
|
ID: r.NativeID,
|
||||||
|
Source: r.Source,
|
||||||
|
Description: r.Description.String,
|
||||||
|
SourceURL: r.SourceURL.String,
|
||||||
|
ReleaseTime: r.ReleaseTime.Int64,
|
||||||
|
ThumbnailURL: r.ThumbnailURL.String,
|
||||||
|
FullLocalPath: r.FullLocalPath.String,
|
||||||
|
Tags: *r.Tags,
|
||||||
|
Languages: *r.Languages,
|
||||||
|
}
|
||||||
|
|
||||||
|
return &video
|
||||||
|
}
|
51
local/youtubeChannelScanner.go
Normal file
51
local/youtubeChannelScanner.go
Normal file
|
@ -0,0 +1,51 @@
|
||||||
|
package local
|
||||||
|
|
||||||
|
type YouTubeChannelScanner interface {
|
||||||
|
Scan(sinceTimestamp int64) <-chan SourceScanIteratorResult
|
||||||
|
}
|
||||||
|
|
||||||
|
type YouTubeAPIChannelScanner struct {
|
||||||
|
api *YouTubeAPI
|
||||||
|
channel string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewYouTubeAPIChannelScanner(apiKey, channel string) (*YouTubeAPIChannelScanner) {
|
||||||
|
scanner := YouTubeAPIChannelScanner {
|
||||||
|
api: NewYouTubeAPI(apiKey),
|
||||||
|
channel: channel,
|
||||||
|
}
|
||||||
|
return &scanner
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *YouTubeAPIChannelScanner) Scan(sinceTimestamp int64) <-chan SourceScanIteratorResult {
|
||||||
|
videoCh := make(chan SourceScanIteratorResult, 10)
|
||||||
|
go func() {
|
||||||
|
defer close(videoCh)
|
||||||
|
|
||||||
|
for firstRun, nextPage := true, ""; firstRun || nextPage != ""; {
|
||||||
|
var videos []SourceVideo
|
||||||
|
var err error
|
||||||
|
|
||||||
|
firstRun = false
|
||||||
|
|
||||||
|
videos, nextPage, err = s.api.GetChannelVideosPage(s.channel, sinceTimestamp, nextPage)
|
||||||
|
if err != nil {
|
||||||
|
videoCh <- SourceScanIteratorResult {
|
||||||
|
Video: nil,
|
||||||
|
Error: err,
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, video := range videos {
|
||||||
|
outVideo := video
|
||||||
|
videoCh <- SourceScanIteratorResult {
|
||||||
|
Video: &outVideo,
|
||||||
|
Error: nil,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return videoCh
|
||||||
|
}
|
74
local/youtubeEnricher.go
Normal file
74
local/youtubeEnricher.go
Normal file
|
@ -0,0 +1,74 @@
|
||||||
|
package local
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/lbryio/lbry.go/v2/extras/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
type YouTubeVideoEnricher interface {
|
||||||
|
EnrichMissing(source *SourceVideo) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type YouTubeAPIVideoEnricher struct {
|
||||||
|
api *YouTubeAPI
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewYouTubeAPIVideoEnricher(apiKey string) (*YouTubeAPIVideoEnricher) {
|
||||||
|
enricher := YouTubeAPIVideoEnricher{
|
||||||
|
api: NewYouTubeAPI(apiKey),
|
||||||
|
}
|
||||||
|
return &enricher
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *YouTubeAPIVideoEnricher) EnrichMissing(source *SourceVideo) error {
|
||||||
|
if source.ReleaseTime != nil {
|
||||||
|
log.Debugf("Video %s does not need enrichment. YouTubeAPIVideoEnricher is skipping.", source.ID)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
snippet, err := e.api.GetVideoSnippet(source.ID)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error snippet data for video %s: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
publishedAt, err := time.Parse(time.RFC3339, snippet.PublishedAt)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error converting publishedAt to timestamp: %v", err)
|
||||||
|
} else {
|
||||||
|
source.ReleaseTime = util.PtrToInt64(publishedAt.Unix())
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type CacheVideoEnricher struct {
|
||||||
|
syncDB *SyncDb
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewCacheVideoEnricher(syncDB *SyncDb) *CacheVideoEnricher {
|
||||||
|
enricher := CacheVideoEnricher {
|
||||||
|
syncDB,
|
||||||
|
}
|
||||||
|
return &enricher
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *CacheVideoEnricher) EnrichMissing(source *SourceVideo) error {
|
||||||
|
if source.ReleaseTime != nil {
|
||||||
|
log.Debugf("Video %s does not need enrichment. YouTubeAPIVideoEnricher is skipping.", source.ID)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
cached, err := e.syncDB.GetVideoRecord(source.Source, source.ID, false, false)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error getting cached video %s: %v", source.ID, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if cached != nil && cached.ReleaseTime.Valid {
|
||||||
|
source.ReleaseTime = &cached.ReleaseTime.Int64
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
157
local/ytapi.go
Normal file
157
local/ytapi.go
Normal file
|
@ -0,0 +1,157 @@
|
||||||
|
package local
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/lbryio/lbry.go/v2/extras/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
type YouTubeAPI struct {
|
||||||
|
apiKey string
|
||||||
|
client *http.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewYouTubeAPI(apiKey string) (*YouTubeAPI) {
|
||||||
|
client := &http.Client {
|
||||||
|
Transport: &http.Transport{
|
||||||
|
MaxIdleConns: 10,
|
||||||
|
IdleConnTimeout: 30 * time.Second,
|
||||||
|
DisableCompression: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
api := YouTubeAPI {
|
||||||
|
apiKey: apiKey,
|
||||||
|
client: client,
|
||||||
|
}
|
||||||
|
|
||||||
|
return &api
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *YouTubeAPI) GetVideoSnippet(videoID string) (*VideoSnippet, error) {
|
||||||
|
req, err := http.NewRequest("GET", "https://youtube.googleapis.com/youtube/v3/videos", nil)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error creating http client for YouTube API: %v", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
query := req.URL.Query()
|
||||||
|
query.Add("part", "snippet")
|
||||||
|
query.Add("id", videoID)
|
||||||
|
query.Add("key", a.apiKey)
|
||||||
|
req.URL.RawQuery = query.Encode()
|
||||||
|
|
||||||
|
req.Header.Add("Accept", "application/json")
|
||||||
|
|
||||||
|
resp, err := a.client.Do(req)
|
||||||
|
defer resp.Body.Close()
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error from YouTube API: %v", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
body, err := io.ReadAll(resp.Body)
|
||||||
|
log.Tracef("Response from YouTube API: %s", string(body[:]))
|
||||||
|
|
||||||
|
var result videoListResponse
|
||||||
|
err = json.Unmarshal(body, &result)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error deserializing video list response from YouTube API: %v", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(result.Items) != 1 {
|
||||||
|
err = fmt.Errorf("YouTube API responded with incorrect number of snippets (%d) while attempting to get snippet data for video %s", len(result.Items), videoID)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &result.Items[0].Snippet, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *YouTubeAPI) GetChannelVideosPage(channelID string, publishedAfter int64, pageToken string) ([]SourceVideo, string, error) {
|
||||||
|
req, err := http.NewRequest("GET", "https://youtube.googleapis.com/youtube/v3/search", nil)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error creating http client for YouTube API: %v", err)
|
||||||
|
return []SourceVideo{}, "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
query := req.URL.Query()
|
||||||
|
query.Add("part", "snippet")
|
||||||
|
query.Add("type", "video")
|
||||||
|
query.Add("channelId", channelID)
|
||||||
|
query.Add("publishedAfter", time.Unix(publishedAfter, 0).Format(time.RFC3339))
|
||||||
|
query.Add("maxResults", "5")
|
||||||
|
if pageToken != "" {
|
||||||
|
query.Add("pageToken", pageToken)
|
||||||
|
}
|
||||||
|
query.Add("key", a.apiKey)
|
||||||
|
req.URL.RawQuery = query.Encode()
|
||||||
|
|
||||||
|
req.Header.Add("Accept", "application/json")
|
||||||
|
|
||||||
|
resp, err := a.client.Do(req)
|
||||||
|
defer resp.Body.Close()
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error from YouTube API: %v", err)
|
||||||
|
return []SourceVideo{}, "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
body, err := io.ReadAll(resp.Body)
|
||||||
|
log.Tracef("Response from YouTube API: %s", string(body[:]))
|
||||||
|
|
||||||
|
var result videoSearchResponse
|
||||||
|
err = json.Unmarshal(body, &result)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error deserializing video list response from YouTube API: %v", err)
|
||||||
|
return []SourceVideo{}, "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
videos := []SourceVideo{}
|
||||||
|
for _, item := range result.Items {
|
||||||
|
var releaseTime *int64
|
||||||
|
publishedAt, err := time.Parse(time.RFC3339, item.Snippet.PublishedAt)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Unable to parse publish time of %s while scanning YouTube channel %s: %v", item.ID.VideoID, channelID)
|
||||||
|
releaseTime = nil
|
||||||
|
} else {
|
||||||
|
releaseTime = util.PtrToInt64(publishedAt.Unix())
|
||||||
|
}
|
||||||
|
|
||||||
|
video := SourceVideo {
|
||||||
|
ID: item.ID.VideoID,
|
||||||
|
Source: "YouTube",
|
||||||
|
ReleaseTime: releaseTime,
|
||||||
|
}
|
||||||
|
videos = append(videos, video)
|
||||||
|
}
|
||||||
|
|
||||||
|
return videos, result.NextPageToken, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type videoListResponse struct {
|
||||||
|
NextPageToken string `json:"nextPageToken"`
|
||||||
|
Items []struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
Snippet VideoSnippet `json:"snippet"`
|
||||||
|
} `json:"items"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type videoSearchResponse struct {
|
||||||
|
NextPageToken string `json:"nextPageToken"`
|
||||||
|
Items []struct {
|
||||||
|
ID struct{
|
||||||
|
VideoID string `json:"videoId"`
|
||||||
|
} `json:"id"`
|
||||||
|
Snippet VideoSnippet `json:"snippet"`
|
||||||
|
} `json:"items"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type VideoSnippet struct {
|
||||||
|
PublishedAt string `json:"publishedAt"`
|
||||||
|
}
|
239
local/ytdl.go
Normal file
239
local/ytdl.go
Normal file
|
@ -0,0 +1,239 @@
|
||||||
|
package local
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"os/exec"
|
||||||
|
"path"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/lbryio/ytsync/v5/downloader/ytdl"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Ytdl struct {
|
||||||
|
DownloadDir string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewYtdl(downloadDir string) (*Ytdl, error) {
|
||||||
|
// TODO validate download dir
|
||||||
|
|
||||||
|
y := Ytdl {
|
||||||
|
DownloadDir: downloadDir,
|
||||||
|
}
|
||||||
|
|
||||||
|
return &y, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (y *Ytdl) GetVideoMetadata(videoID string) (*ytdl.YtdlVideo, error) {
|
||||||
|
metadataPath, err := y.GetVideoMetadataFile(videoID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
metadataBytes, err := os.ReadFile(metadataPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var metadata *ytdl.YtdlVideo
|
||||||
|
err = json.Unmarshal(metadataBytes, &metadata)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return metadata, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
func (y *Ytdl) GetVideoMetadataFile(videoID string) (string, error) {
|
||||||
|
basePath := path.Join(y.DownloadDir, videoID)
|
||||||
|
metadataPath := basePath + ".info.json"
|
||||||
|
|
||||||
|
_, err := os.Stat(metadataPath)
|
||||||
|
if err != nil && !os.IsNotExist(err) {
|
||||||
|
log.Errorf("Error determining if video metadata already exists: %v", err)
|
||||||
|
return "", err
|
||||||
|
} else if err != nil {
|
||||||
|
log.Debugf("Metadata file for video %s does not exist. Downloading now.", videoID)
|
||||||
|
err = downloadVideoMetadata(basePath, videoID)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return metadataPath, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (y *Ytdl) GetVideoFile(videoID string) (string, error) {
|
||||||
|
videoPath, err := findDownloadedVideo(y.DownloadDir, videoID)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
if videoPath != nil {
|
||||||
|
return *videoPath, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
basePath := path.Join(y.DownloadDir, videoID)
|
||||||
|
metadataPath, err := y.GetVideoMetadataFile(videoID)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error getting metadata path in preparation for video download: %v", err)
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
err = downloadVideo(basePath, metadataPath)
|
||||||
|
if err != nil {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
videoPath, err = findDownloadedVideo(y.DownloadDir, videoID)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error from findDownloadedVideo() after already succeeding once: %v", err)
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
if videoPath == nil {
|
||||||
|
return "", errors.New("Could not find a downloaded video after successful download.")
|
||||||
|
}
|
||||||
|
|
||||||
|
return *videoPath, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (y *Ytdl) DeleteVideoFiles(videoID string) error {
|
||||||
|
files, err := ioutil.ReadDir(y.DownloadDir)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, f := range files {
|
||||||
|
if f.IsDir() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if strings.Contains(f.Name(), videoID) {
|
||||||
|
videoPath := path.Join(y.DownloadDir, f.Name())
|
||||||
|
err = os.Remove(videoPath)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error while deleting file %s: %v", y.DownloadDir, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func deleteFile(path string) error {
|
||||||
|
_, err := os.Stat(path)
|
||||||
|
if err != nil && !os.IsNotExist(err) {
|
||||||
|
log.Errorf("Error determining if file %s exists: %v", path, err)
|
||||||
|
return err
|
||||||
|
} else if err != nil {
|
||||||
|
log.Debugf("File %s does not exist. Skipping deletion.", path)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return os.Remove(path)
|
||||||
|
}
|
||||||
|
|
||||||
|
func findDownloadedVideo(videoDir, videoID string) (*string, error) {
|
||||||
|
files, err := ioutil.ReadDir(videoDir)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, f := range files {
|
||||||
|
if f.IsDir() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if path.Ext(f.Name()) == ".mp4" && strings.Contains(f.Name(), videoID) {
|
||||||
|
videoPath := path.Join(videoDir, f.Name())
|
||||||
|
return &videoPath, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func downloadVideoMetadata(basePath, videoID string) error {
|
||||||
|
ytdlArgs := []string{
|
||||||
|
"--skip-download",
|
||||||
|
"--write-info-json",
|
||||||
|
"--force-overwrites",
|
||||||
|
fmt.Sprintf("https://www.youtube.com/watch?v=%s", videoID),
|
||||||
|
"--cookies",
|
||||||
|
"cookies.txt",
|
||||||
|
"-o",
|
||||||
|
basePath,
|
||||||
|
}
|
||||||
|
ytdlCmd := exec.Command("yt-dlp", ytdlArgs...)
|
||||||
|
output, err := runCmd(ytdlCmd)
|
||||||
|
log.Debug(output)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func downloadVideo(basePath, metadataPath string) error {
|
||||||
|
ytdlArgs := []string{
|
||||||
|
"--no-progress",
|
||||||
|
"-o",
|
||||||
|
basePath,
|
||||||
|
"--merge-output-format",
|
||||||
|
"mp4",
|
||||||
|
"--postprocessor-args",
|
||||||
|
"ffmpeg:-movflags faststart",
|
||||||
|
"--abort-on-unavailable-fragment",
|
||||||
|
"--fragment-retries",
|
||||||
|
"1",
|
||||||
|
"--cookies",
|
||||||
|
"cookies.txt",
|
||||||
|
"--extractor-args",
|
||||||
|
"youtube:player_client=android",
|
||||||
|
"--load-info-json",
|
||||||
|
metadataPath,
|
||||||
|
"-fbestvideo[ext=mp4][vcodec!*=av01][height<=720]+bestaudio[ext!=webm][format_id!=258][format_id!=251][format_id!=256][format_id!=327]",
|
||||||
|
}
|
||||||
|
|
||||||
|
ytdlCmd := exec.Command("yt-dlp", ytdlArgs...)
|
||||||
|
output, err := runCmd(ytdlCmd)
|
||||||
|
log.Debug(output)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func runCmd(cmd *exec.Cmd) ([]string, error) {
|
||||||
|
log.Infof("running cmd: %s", strings.Join(cmd.Args, " "))
|
||||||
|
var err error
|
||||||
|
stderr, err := cmd.StderrPipe()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
stdout, err := cmd.StdoutPipe()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
err = cmd.Start()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
outLog, err := ioutil.ReadAll(stdout)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
errorLog, err := ioutil.ReadAll(stderr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
done := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
done <- cmd.Wait()
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case err := <-done:
|
||||||
|
if err != nil {
|
||||||
|
log.Error(string(errorLog))
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return strings.Split(strings.Replace(string(outLog), "\r\n", "\n", -1), "\n"), nil
|
||||||
|
}
|
||||||
|
}
|
101
local/ytdlVideoSource.go
Normal file
101
local/ytdlVideoSource.go
Normal file
|
@ -0,0 +1,101 @@
|
||||||
|
package local
|
||||||
|
|
||||||
|
import (
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/lbryio/ytsync/v5/downloader/ytdl"
|
||||||
|
)
|
||||||
|
|
||||||
|
type YtdlVideoSource struct {
|
||||||
|
downloader Ytdl
|
||||||
|
channelScanner YouTubeChannelScanner
|
||||||
|
enrichers []YouTubeVideoEnricher
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewYtdlVideoSource(downloadDir string, config *YouTubeSourceConfig, syncDB *SyncDb) (*YtdlVideoSource, error) {
|
||||||
|
ytdl, err := NewYtdl(downloadDir)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
source := YtdlVideoSource {
|
||||||
|
downloader: *ytdl,
|
||||||
|
}
|
||||||
|
|
||||||
|
if syncDB != nil {
|
||||||
|
source.enrichers = append(source.enrichers, NewCacheVideoEnricher(syncDB))
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.APIKey != "" {
|
||||||
|
ytapiEnricher := NewYouTubeAPIVideoEnricher(config.APIKey)
|
||||||
|
source.enrichers = append(source.enrichers, ytapiEnricher)
|
||||||
|
source.channelScanner = NewYouTubeAPIChannelScanner(config.APIKey, config.ChannelID)
|
||||||
|
}
|
||||||
|
|
||||||
|
if source.channelScanner == nil {
|
||||||
|
log.Warnf("No means of scanning source channels has been provided")
|
||||||
|
}
|
||||||
|
|
||||||
|
return &source, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *YtdlVideoSource) SourceName() string {
|
||||||
|
return "YouTube"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *YtdlVideoSource) GetVideo(id string) (*SourceVideo, error) {
|
||||||
|
metadata, err := s.downloader.GetVideoMetadata(id)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
videoPath, err := s.downloader.GetVideoFile(id)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var bestThumbnail *ytdl.Thumbnail = nil
|
||||||
|
for i, thumbnail := range metadata.Thumbnails {
|
||||||
|
if i == 0 || bestThumbnail.Width < thumbnail.Width {
|
||||||
|
bestThumbnail = &thumbnail
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sourceVideo := SourceVideo {
|
||||||
|
ID: id,
|
||||||
|
Source: "YouTube",
|
||||||
|
Title: &metadata.Title,
|
||||||
|
Description: &metadata.Description,
|
||||||
|
SourceURL: "\nhttps://www.youtube.com/watch?v=" + id,
|
||||||
|
Languages: []string{},
|
||||||
|
Tags: metadata.Tags,
|
||||||
|
ReleaseTime: nil,
|
||||||
|
ThumbnailURL: &bestThumbnail.URL,
|
||||||
|
FullLocalPath: &videoPath,
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, enricher := range s.enrichers {
|
||||||
|
err = enricher.EnrichMissing(&sourceVideo)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("Error enriching video %s, continuing enrichment: %v", id, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debugf("Source video retrieved via ytdl: %v", sourceVideo)
|
||||||
|
|
||||||
|
return &sourceVideo, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *YtdlVideoSource) DeleteLocalCache(id string) error {
|
||||||
|
return s.downloader.DeleteVideoFiles(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *YtdlVideoSource) Scan(sinceTimestamp int64) <-chan SourceScanIteratorResult {
|
||||||
|
if s.channelScanner != nil {
|
||||||
|
return s.channelScanner.Scan(sinceTimestamp)
|
||||||
|
}
|
||||||
|
|
||||||
|
videoCh := make(chan SourceScanIteratorResult, 1)
|
||||||
|
close(videoCh)
|
||||||
|
return videoCh
|
||||||
|
}
|
Loading…
Reference in a new issue