Tracking synced videos #115

Open
pseudoscalar wants to merge 10 commits from pseudoscalar/issue/112/v2 into local
11 changed files with 1949 additions and 5 deletions

1
go.mod
View file

@ -22,6 +22,7 @@ require (
github.com/kr/pretty v0.2.1 // indirect
github.com/lbryio/lbry.go/v2 v2.7.2-0.20210824154606-3e18b74da08b
github.com/lbryio/reflector.go v1.1.3-0.20210412225256-4392c9724262
github.com/mattn/go-sqlite3 v1.10.0
github.com/miekg/dns v1.1.22 // indirect
github.com/mitchellh/go-ps v0.0.0-20190716172923-621e5597135b
github.com/opencontainers/go-digest v1.0.0-rc1 // indirect

1
go.sum
View file

@ -318,6 +318,7 @@ github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaO
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU=
github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-sqlite3 v1.10.0 h1:jbhqpg7tQe4SupckyijYiy0mJJ/pRyHvXf7JdWK860o=
github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=

View file

@ -1,22 +1,460 @@
package local
import (
"fmt"
"errors"
"os"
"regexp"
"strings"
"time"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/abadojack/whatlanggo"
"github.com/lbryio/lbry.go/v2/extras/util"
"github.com/lbryio/ytsync/v5/namer"
"github.com/lbryio/ytsync/v5/tags_manager"
)
type SyncContext struct {
DryRun bool
KeepCache bool
ReflectStreams bool
ForceChannelScan bool
TempDir string
SyncDbPath string
LbrynetAddr string
ChannelID string
VideoID string
PublishBid float64
YouTubeSourceConfig *YouTubeSourceConfig
}
func (c *SyncContext) Validate() error {
if c.TempDir == "" {
return errors.New("No TempDir provided")
}
if c.SyncDbPath == "" {
return errors.New("No sync DB path provided")
}
if c.LbrynetAddr == "" {
return errors.New("No Lbrynet address provided")
}
if c.ChannelID == "" {
return errors.New("No channel ID provided")
}
if c.PublishBid <= 0.0 {
return errors.New("Publish bid is not greater than zero")
}
if c.YouTubeSourceConfig.ChannelID != "" {
// Validate for YouTube source
// For now, an API key is required
if c.YouTubeSourceConfig.APIKey == "" {
return errors.New("YouTube source was selected, but no YouTube API key was provided.")
}
} else {
return errors.New("No video source provided")
}
return nil
}
type YouTubeSourceConfig struct {
ChannelID string
APIKey string
}
var syncContext SyncContext
func AddCommand(rootCmd *cobra.Command) {
cmd := &cobra.Command{
Use: "local",
Short: "run a personal ytsync",
Run: localCmd,
Args: cobra.ExactArgs(0),
}
//cmd.Flags().StringVar(&cache, "cache", "", "path to cache")
rootCmd.AddCommand(cmd)
cmd.Flags().BoolVar(&syncContext.DryRun, "dry-run", false, "Display information about the stream publishing, but do not publish the stream")
cmd.Flags().BoolVar(&syncContext.KeepCache, "keep-cache", false, "Don't delete local files after publishing.")
cmd.Flags().BoolVar(&syncContext.ReflectStreams, "reflect-streams", true, "Require published streams to be reflected.")
cmd.Flags().BoolVar(&syncContext.ForceChannelScan, "force-rescan", false, "Rescan channel to fill the sync DB.")
cmd.Flags().StringVar(&syncContext.TempDir, "temp-dir", getEnvDefault("TEMP_DIR", ""), "directory to use for temporary files")
cmd.Flags().StringVar(&syncContext.SyncDbPath, "sync-db-path", getEnvDefault("SYNC_DB_PATH", ""), "Path to the local sync DB")
cmd.Flags().Float64Var(&syncContext.PublishBid, "publish-bid", 0.01, "Bid amount for the stream claim")
cmd.Flags().StringVar(&syncContext.LbrynetAddr, "lbrynet-address", getEnvDefault("LBRYNET_ADDRESS", ""), "JSONRPC address of the local LBRYNet daemon")
cmd.Flags().StringVar(&syncContext.ChannelID, "channel-id", "", "LBRY channel ID to publish to")
cmd.Flags().StringVar(&syncContext.VideoID, "video-id", "", "ID of video to sync. This will attempt to sync only this one video.")
syncContext.YouTubeSourceConfig = &YouTubeSourceConfig{}
cmd.Flags().StringVar(&syncContext.YouTubeSourceConfig.APIKey, "youtube-api-key", getEnvDefault("YOUTUBE_API_KEY", ""), "YouTube API Key")
cmd.Flags().StringVar(&syncContext.YouTubeSourceConfig.ChannelID, "youtube-channel", "", "YouTube Channel ID")
rootCmd.AddCommand(cmd)
}
func getEnvDefault(key, defaultValue string) string {
if value, ok := os.LookupEnv(key); ok {
return value
}
return defaultValue
}
func localCmd(cmd *cobra.Command, args []string) {
fmt.Println("local")
err := syncContext.Validate()
if err != nil {
log.Error(err)
return
}
syncDB, err := NewSyncDb(syncContext.SyncDbPath)
if err != nil {
log.Errorf("Error creating sync DB: %v", err)
return
}
defer syncDB.Close()
var publisher VideoPublisher
publisher, err = NewLocalSDKPublisher(syncContext.LbrynetAddr, syncContext.ChannelID, syncContext.PublishBid)
if err != nil {
log.Errorf("Error setting up publisher: %v", err)
return
}
var videoSource VideoSource
if syncContext.YouTubeSourceConfig != nil {
videoSource, err = NewYtdlVideoSource(syncContext.TempDir, syncContext.YouTubeSourceConfig, syncDB)
if err != nil {
log.Errorf("Error setting up video source: %v", err)
return
}
}
latestPublishedReleaseTime := int64(0)
latestKnownReleaseTime := int64(0)
if syncContext.ForceChannelScan {
log.Infof("Channel scan is being forced.")
} else {
dbSummary, err := syncDB.GetSummary()
if err != nil {
log.Errorf("Error getting sync DB summary for update scan: %v", err)
return
}
latestPublishedReleaseTime = dbSummary.LatestPublished
latestKnownReleaseTime = dbSummary.LatestKnown
}
log.Debugf("Latest known release time: %d", latestKnownReleaseTime)
for result := range videoSource.Scan(latestKnownReleaseTime) {
if result.Error != nil {
log.Errorf("Error while discovering new videos from source: %v", result.Error)
} else {
syncDB.SaveKnownVideo(*result.Video)
}
}
log.Debugf("Latest published release time: %d", latestPublishedReleaseTime)
for result := range publisher.PublishedVideoIterator(latestPublishedReleaseTime) {
if result.Error != nil {
log.Errorf("Error while discovering published videos: %v", result.Error)
} else {
syncDB.SavePublishedVideo(*result.Video)
}
}
var videoIDs []string
if syncContext.VideoID == "" {
videoIDs, err = syncDB.GetUnpublishedIDs(videoSource.SourceName())
if err != nil {
log.Errorf("Error getting unpublished videos from sync DB: %v", err)
return
}
} else {
videoIDs = []string{ syncContext.VideoID }
}
log.Debugf("Syncing videos: %v", videoIDs)
for _, videoID := range videoIDs {
err = syncVideo(syncContext, syncDB, videoSource, publisher, videoID)
if err != nil {
log.Errorf("Error syncing %s: %v", videoID, err)
return
}
}
log.Info("Done")
}
func cacheVideo(syncContext SyncContext, syncDB *SyncDb, videoSource VideoSource, videoID string) (*PublishableVideo, error) {
log.Debugf("Ensuring video %s:%s is cached", videoSource.SourceName(), videoID)
videoRecord, err := syncDB.GetVideoRecord(videoSource.SourceName(), videoID, true, true)
if err != nil {
log.Errorf("Error checking if video is already cached: %v", err)
return nil, err
}
if videoRecord != nil && videoRecord.FullLocalPath.Valid {
log.Debugf("%s:%s is already cached.", videoSource.SourceName(), videoID)
video := videoRecord.ToPublishableVideo()
if video == nil {
log.Warnf("%s:%s appears to be cached locally, but has missing data. Caching again.")
}
return video, nil
}
log.Debugf("%s:%s is not cached locally. Caching now.", videoSource.SourceName(), videoID)
sourceVideo, err := videoSource.GetVideo(videoID)
if err != nil {
log.Errorf("Error getting source video: %v", err)
return nil, err
}
processedVideo, err := processVideoForPublishing(*sourceVideo, syncContext.ChannelID)
if err != nil {
log.Errorf("Error processing source video for publishing: %v", err)
return nil, err
}
err = syncDB.SavePublishableVideo(*processedVideo)
if err != nil {
log.Errorf("Error saving video data: %v", err)
return nil, err
}
return processedVideo, nil
}
func syncVideo(syncContext SyncContext, syncDB *SyncDb, videoSource VideoSource, publisher VideoPublisher, videoID string) error {
log.Debugf("Running sync for video %s:%s", videoSource.SourceName(), videoID)
isSynced, claimID, err := syncDB.IsVideoPublished(videoSource.SourceName(), videoID)
if err != nil {
log.Errorf("Error checking if video is already synced: %v", err)
return err
}
if isSynced {
log.Infof("Video %s:%s is already published as %s.", videoSource.SourceName(), videoID, claimID)
return nil
}
processedVideo, err := cacheVideo(syncContext, syncDB, videoSource, videoID)
if err != nil {
log.Errorf("Error ensuring video is cached prior to publication: %v", err)
return err
}
if syncContext.DryRun {
log.Infoln("This is a dry run. Nothing will be published.")
log.Infof("The local file %s would be published to channel ID %s as %s.", processedVideo.FullLocalPath, syncContext.ChannelID, processedVideo.ClaimName)
log.Debugf("Object to be published: %v", processedVideo)
} else {
claimID, doneReflectingCh, err := publisher.Publish(*processedVideo, syncContext.ReflectStreams)
if err != nil {
log.Errorf("Error publishing video: %v", err)
return err
}
err = syncDB.SavePublishedVideo((*processedVideo).ToPublished(claimID))
if err != nil {
// Sync DB is corrupted after getting here
// and will allow double publication.
log.Errorf("Error saving video publication to sync DB: %v", err)
return err
}
if syncContext.ReflectStreams {
err = <-doneReflectingCh
if err != nil {
log.Errorf("Error while wating for stream to reflect: %v", err)
return err
}
} else {
log.Debugln("Not waiting for stream to reflect.")
}
}
if !syncContext.KeepCache {
log.Infof("Deleting local files.")
err = syncDB.MarkVideoUncached(videoSource.SourceName(), videoID)
if err != nil {
log.Errorf("Error marking video %s:%s as uncached in syncDB", videoSource.SourceName(), videoID)
return err
}
err = videoSource.DeleteLocalCache(videoID)
if err != nil {
log.Errorf("Error deleting local files for video %s: %v", videoID, err)
return err
}
}
return nil
}
type SourceVideo struct {
ID string
Source string
Title *string
Description *string
SourceURL string
Languages []string
Tags []string
ReleaseTime *int64
ThumbnailURL *string
FullLocalPath *string
}
type PublishableVideo struct {
ID string
Source string
ClaimName string
Title string
Description string
SourceURL string
Languages []string
Tags []string
ReleaseTime int64
ThumbnailURL string
FullLocalPath string
}
func (v PublishableVideo) ToPublished(claimID string) PublishedVideo {
return PublishedVideo {
ClaimID: claimID,
NativeID: v.ID,
Source: v.Source,
ClaimName: v.ClaimName,
Title: v.Title,
Description: v.Description,
SourceURL: v.SourceURL,
Languages: v.Languages,
Tags: v.Tags,
ReleaseTime: v.ReleaseTime,
ThumbnailURL: v.ThumbnailURL,
FullLocalPath: v.FullLocalPath,
}
}
type PublishedVideo struct {
ClaimID string
NativeID string
Source string
ClaimName string
Title string
Description string
SourceURL string
Languages []string
Tags []string
ReleaseTime int64
ThumbnailURL string
FullLocalPath string
}
func (v PublishedVideo) ToPublishable() PublishableVideo {
return PublishableVideo {
ID: v.NativeID,
Source: v.Source,
ClaimName: v.ClaimName,
Title: v.Title,
Description: v.Description,
SourceURL: v.SourceURL,
Languages: v.Languages,
Tags: v.Tags,
ReleaseTime: v.ReleaseTime,
ThumbnailURL: v.ThumbnailURL,
FullLocalPath: v.FullLocalPath,
}
}
func processVideoForPublishing(source SourceVideo, channelID string) (*PublishableVideo, error) {
if source.FullLocalPath == nil {
return nil, errors.New("Video is not cached locally")
}
tags, err := tags_manager.SanitizeTags(source.Tags, channelID)
if err != nil {
log.Errorf("Error sanitizing tags: %v", err)
return nil, err
}
descriptionSample := ""
if source.Description != nil {
urlsRegex := regexp.MustCompile(`(?m) ?(f|ht)(tp)(s?)(://)(.*)[.|/](.*)`)
descriptionSample = urlsRegex.ReplaceAllString(*source.Description, "")
}
info := whatlanggo.Detect(descriptionSample)
title := ""
if source.Title != nil {
title = *source.Title
}
info2 := whatlanggo.Detect(title)
var languages []string = nil
if info.IsReliable() && info.Lang.Iso6391() != "" {
language := info.Lang.Iso6391()
languages = []string{language}
} else if info2.IsReliable() && info2.Lang.Iso6391() != "" {
language := info2.Lang.Iso6391()
languages = []string{language}
}
claimName := namer.NewNamer().GetNextName(title)
thumbnailURL := source.ThumbnailURL
if thumbnailURL == nil {
thumbnailURL = util.PtrToString("")
}
releaseTime := source.ReleaseTime
if releaseTime == nil {
releaseTime = util.PtrToInt64(time.Now().Unix())
}
processed := PublishableVideo {
ID: source.ID,
Source: source.Source,
ClaimName: claimName,
Title: title,
Description: getAbbrevDescription(source),
Languages: languages,
Tags: tags,
ReleaseTime: *releaseTime,
ThumbnailURL: *thumbnailURL,
FullLocalPath: *source.FullLocalPath,
}
log.Debugf("Video prepared for publication: %v", processed)
return &processed, nil
}
func getAbbrevDescription(v SourceVideo) string {
if v.Description == nil {
return v.SourceURL
}
additionalDescription := "\n...\n" + v.SourceURL
maxLength := 2800 - len(additionalDescription)
description := strings.TrimSpace(*v.Description)
if len(description) > maxLength {
description = description[:maxLength]
}
return description + additionalDescription
}
type VideoSource interface {
SourceName() string
GetVideo(id string) (*SourceVideo, error)
DeleteLocalCache(id string) error
Scan(sinceTimestamp int64) <-chan SourceScanIteratorResult
}
type SourceScanIteratorResult struct {
Video *SourceVideo
Error error
}
type VideoPublisher interface {
Publish(video PublishableVideo, reflectStream bool) (string, <-chan error, error)
PublishedVideoIterator(sinceTimestamp int64) <-chan PublishedVideoIteratorResult
}
type PublishedVideoIteratorResult struct {
Video *PublishedVideo
Error error
}

190
local/localSDKPublisher.go Normal file
View file

@ -0,0 +1,190 @@
package local
import (
"errors"
"sort"
"time"
log "github.com/sirupsen/logrus"
"github.com/lbryio/lbry.go/v2/extras/jsonrpc"
"github.com/lbryio/lbry.go/v2/extras/util"
)
type LocalSDKPublisher struct {
channelID string
publishBid float64
lbrynet *jsonrpc.Client
}
func NewLocalSDKPublisher(sdkAddr, channelID string, publishBid float64) (*LocalSDKPublisher, error) {
lbrynet := jsonrpc.NewClient(sdkAddr)
lbrynet.SetRPCTimeout(5 * time.Minute)
status, err := lbrynet.Status()
if err != nil {
return nil, err
}
if !status.IsRunning {
return nil, errors.New("SDK is not running")
}
// Should check to see if the SDK owns the channel
// Should check to see if wallet is unlocked
// but jsonrpc.Client doesn't have WalletStatus method
// so skip for now
// Should check to see if streams are configured to be reflected and warn if not
// but jsonrpc.Client doesn't have SettingsGet method to see if streams are reflected
// so use File.UploadingToReflector as a proxy for now
publisher := LocalSDKPublisher {
channelID: channelID,
publishBid: publishBid,
lbrynet: lbrynet,
}
return &publisher, nil
}
func (p *LocalSDKPublisher) Publish(video PublishableVideo, reflectStream bool) (string, <-chan error, error) {
streamCreateOptions := jsonrpc.StreamCreateOptions {
ClaimCreateOptions: jsonrpc.ClaimCreateOptions {
Title: &video.Title,
Description: &video.Description,
Languages: video.Languages,
ThumbnailURL: &video.ThumbnailURL,
Tags: video.Tags,
},
ReleaseTime: &video.ReleaseTime,
ChannelID: &p.channelID,
License: util.PtrToString("Copyrighted (contact publisher)"),
}
txSummary, err := p.lbrynet.StreamCreate(video.ClaimName, video.FullLocalPath, p.publishBid, streamCreateOptions)
if err != nil {
return "", nil, err
}
var claimID *string
for _, output := range txSummary.Outputs {
if output.Type == "claim" {
claimID = &output.ClaimID
break
}
}
if claimID == nil {
return "", nil, errors.New("Publish transaction did not have a claim output.")
}
if !reflectStream {
return "", nil, nil
}
done := make(chan error, 1)
go func() {
for {
fileListResponse, fileIndex, err := findFileByTxid(p.lbrynet, txSummary.Txid)
if err != nil {
log.Errorf("Error finding file by txid: %v", err)
done <- err
return
}
if fileListResponse == nil {
log.Errorf("Could not find file in list with correct txid")
done <- err
return
}
fileStatus := fileListResponse.Items[fileIndex]
if fileStatus.IsFullyReflected {
log.Info("Stream is fully reflected")
break
}
if !fileStatus.UploadingToReflector {
log.Error("Stream is not being uploaded to a reflector. Check your lbrynet settings if this is a mistake.")
done <- errors.New("Stream is not being reflected (check lbrynet settings).")
return
}
log.Infof("Stream reflector progress: %d%%", fileStatus.ReflectorProgress)
time.Sleep(5 * time.Second)
}
done <- nil
}()
return *claimID, done, nil
}
func (p *LocalSDKPublisher) PublishedVideoIterator(sinceTimestamp int64) <-chan PublishedVideoIteratorResult {
videoCh := make(chan PublishedVideoIteratorResult, 10)
go func() {
defer close(videoCh)
for page := uint64(0); ; page++ {
streams, err := p.lbrynet.StreamList(nil, page, 100)
if err != nil {
log.Errorf("Error listing streams (page %d): %v", page, err)
errResult := PublishedVideoIteratorResult {
Error: err,
}
videoCh <- errResult
return
}
if len(streams.Items) == 0 {
return
}
for _, stream := range streams.Items {
if stream.ChannelID != p.channelID || stream.Value.GetStream().ReleaseTime < sinceTimestamp {
continue
}
languages := []string{}
for _, language := range stream.Value.Languages {
languages = append(languages, language.String())
}
video := PublishedVideo {
ClaimID: stream.ClaimID,
NativeID: "",
Source: "",
ClaimName: stream.Name,
Title: stream.Value.Title,
Description: stream.Value.Description,
Languages: languages,
Tags: stream.Value.Tags,
ReleaseTime: stream.Value.GetStream().ReleaseTime,
ThumbnailURL: stream.Value.Thumbnail.Url,
FullLocalPath: "",
}
videoResult := PublishedVideoIteratorResult {
Video: &video,
}
videoCh <- videoResult
}
}
}()
return videoCh
}
// if jsonrpc.Client.FileList is extended to match the actual jsonrpc schema, this can be removed
func findFileByTxid(client *jsonrpc.Client, txid string) (*jsonrpc.FileListResponse, int, error) {
response, err := client.FileList(0, 20)
for {
if err != nil {
log.Errorf("Error getting file list page: %v", err)
return nil, 0, err
}
index := sort.Search(len(response.Items), func (i int) bool { return response.Items[i].Txid == txid })
if index < len(response.Items) {
return response, index, nil
}
if response.Page >= response.TotalPages {
return nil, 0, nil
}
response, err = client.FileList(response.Page + 1, 20)
}
}

View file

@ -5,6 +5,7 @@
- LBRY SDK (what do we actually need this for?)
- youtube-dl
- enough space to cache stuff
- YouTube data API key
## Process
@ -15,6 +16,21 @@
- or easier, just error if no channel
- enough lbc in wallet?
### Getting a YouTube API key
To access the YouTube data API, you will first need some kind of google account.
The API has two methods of authentication, OAuth2 and API keys. This application uses API keys.
These API keys are basically like passwords, and so once obtained, they should not be shared.
The instructions for obtaining an API key are copied below from [here](https://developers.google.com/youtube/registering_an_application):
1. Open the [Credentials page](https://console.developers.google.com/apis/credentials) in the API Console.
2. Create an API key in the Console by clicking **Create credentials > API key**. You can restrict the key before using it in production by clicking **Restrict key** and selecting one of the **Restrictions**.
To keep your API keys secure, follow the [best practices for securely using API keys](https://cloud.google.com/docs/authentication/api-keys).
### Options to figure out what's already synced
- simplest: assume nothing is synced yet

676
local/syncDb.go Normal file
View file

@ -0,0 +1,676 @@
package local
import (
"database/sql"
_ "github.com/mattn/go-sqlite3"
log "github.com/sirupsen/logrus"
)
type SyncDb struct {
db *sql.DB
}
type SyncDbSummary struct {
Total int
CachedUnpublished int
UncachedUnpublished int
LatestKnown int64
LatestPublished int64
}
func NewSyncDb(path string) (*SyncDb, error) {
db, err := sql.Open("sqlite3", path)
if err != nil {
log.Errorf("Error opening cache DB at %s: %v", path, err)
return nil, err
}
cache := SyncDb {
db: db,
}
err = cache.ensureSchema()
if err != nil {
log.Errorf("Error while ensuring sync DB structure: %v", err)
return nil, err
}
return &cache, nil
}
func (c *SyncDb) Close() error {
return c.db.Close()
}
func (c *SyncDb) SaveKnownVideo(video SourceVideo) error {
if video.ID == "" {
log.Warnf("Trying to save a video with no ID: %v", video)
}
insertSql := `
INSERT INTO videos (
source,
native_id,
title,
description,
source_url,
release_time,
thumbnail_url,
full_local_path
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (source, native_id)
DO NOTHING;
`
r := SyncRecordFromSourceVideo(video)
_, err := c.db.Exec(
insertSql,
r.Source,
r.NativeID,
r.Title,
r.Description,
r.SourceURL,
r.ReleaseTime,
r.ThumbnailURL,
r.FullLocalPath,
)
return err
}
func (c *SyncDb) SavePublishableVideo(video PublishableVideo) error {
upsertSql := `
INSERT INTO videos (
source,
native_id,
claim_name,
title,
description,
source_url,
release_time,
thumbnail_url,
full_local_path
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (source, native_id)
DO UPDATE SET
claim_name = excluded.claim_name,
title = excluded.title,
description = excluded.description,
source_url = excluded.source_url,
release_time = excluded.release_time,
thumbnail_url = excluded.thumbnail_url,
full_local_path = excluded.full_local_path;
`
_, err := c.db.Exec(
upsertSql,
video.Source,
video.ID,
video.ClaimName,
video.Title,
video.Description,
video.SourceURL,
video.ReleaseTime,
video.ThumbnailURL,
video.FullLocalPath,
)
if err != nil {
return err
}
err = c.upsertTags(video.Source, video.ID, video.Tags)
if err != nil {
return err
}
err = c.upsertLanguages(video.Source, video.ID, video.Languages)
return err
}
func (c *SyncDb) SavePublishedVideo(video PublishedVideo) error {
upsertSql := `
INSERT INTO videos (
source,
native_id,
claim_id,
claim_name,
title,
description,
source_url,
release_time,
thumbnail_url,
full_local_path
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (source, native_id)
DO UPDATE SET
claim_id = excluded.claim_id,
claim_name = excluded.claim_name,
title = excluded.title,
description = excluded.description,
source_url = excluded.source_url,
release_time = excluded.release_time,
thumbnail_url = excluded.thumbnail_url,
full_local_path = excluded.full_local_path;
`
_, err := c.db.Exec(
upsertSql,
video.Source,
video.NativeID,
video.ClaimID,
video.ClaimName,
video.Title,
video.Description,
video.SourceURL,
video.ReleaseTime,
video.ThumbnailURL,
video.FullLocalPath,
)
if err != nil {
return err
}
err = c.upsertTags(video.Source, video.NativeID, video.Tags)
if err != nil {
return err
}
err = c.upsertLanguages(video.Source, video.NativeID, video.Languages)
return err
}
func (c *SyncDb) MarkVideoUncached(source, id string) error {
updateSql := `
UPDATE videos
SET full_local_path = NULL
WHERE source = ? AND native_id = ?;
`
_, err := c.db.Exec(
updateSql,
source,
id,
)
return err
}
func (c *SyncDb) _SaveVideoData(video SourceVideo) error {
upsertSql := `
INSERT INTO videos (
source,
native_id,
title,
description,
source_url,
release_time,
thumbnail_url,
full_local_path
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (source, native_id)
DO UPDATE SET
title = excluded.title,
description = excluded.description,
source_url = excluded.source_url,
release_time = excluded.release_time,
thumbnail_url = excluded.thumbnail_url,
full_local_path = excluded.full_local_path;
`
_, err := c.db.Exec(
upsertSql,
video.Source,
video.ID,
video.Title,
video.Description,
video.SourceURL,
video.ReleaseTime,
video.ThumbnailURL,
video.FullLocalPath,
)
if err != nil {
return err
}
err = c.upsertTags(video.Source, video.ID, video.Tags)
if err != nil {
return err
}
err = c.upsertLanguages(video.Source, video.ID, video.Languages)
return err
}
func (c *SyncDb) SaveVideoPublication(video PublishableVideo, claimID string) error {
upsertSql := `
INSERT INTO videos (
source,
native_id,
title,
description,
source_url,
release_time,
thumbnail_url,
full_local_path,
claim_id
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (source, native_id)
DO UPDATE SET
title = excluded.title,
description = excluded.description,
source_url = excluded.source_url,
release_time = excluded.release_time,
thumbnail_url = excluded.thumbnail_url,
full_local_path = excluded.full_local_path,
claim_id = excluded.claim_id;
`
_, err := c.db.Exec(
upsertSql,
video.Source,
video.ID,
video.Title,
video.Description,
video.SourceURL,
video.ReleaseTime,
video.ThumbnailURL,
video.FullLocalPath,
claimID,
)
if err != nil {
return err
}
err = c.upsertTags(video.Source, video.ID, video.Tags)
if err != nil {
return err
}
err = c.upsertLanguages(video.Source, video.ID, video.Languages)
return err
}
func (c *SyncDb) IsVideoPublished(source, id string) (bool, string, error) {
selectSql := `
SELECT
claim_id
FROM videos
WHERE source = ? AND native_id = ?
`
row := c.db.QueryRow(selectSql, source, id)
var claimID sql.NullString
err := row.Scan(&claimID)
if err == sql.ErrNoRows {
return false, "", nil
} else if err != nil {
log.Errorf("Error querying video publication for %s:%s from sync DB: %v", source, id, err)
return false, "", err
}
if claimID.Valid {
return true, claimID.String, nil
} else {
return false, "", nil
}
}
func (c *SyncDb) IsVideoCached(source, id string) (bool, string, error) {
selectSql := `
SELECT
full_local_path
FROM videos
WHERE source = ? AND native_id = ?
`
row := c.db.QueryRow(selectSql, source, id)
var localPath sql.NullString
err := row.Scan(&localPath)
if err == sql.ErrNoRows {
return false, "", nil
} else if err != nil {
log.Errorf("Error querying video cache status for %s:%s from sync DB: %v", source, id, err)
return false, "", err
}
if localPath.Valid {
return true, localPath.String, nil
} else {
return false, "", nil
}
}
func (c *SyncDb) GetVideoRecord(source, id string, includeTags, includeLanguages bool) (*SyncRecord, error) {
selectSql := `
SELECT
native_id,
title,
description,
source_url,
release_time,
thumbnail_url,
full_local_path,
claim_id
FROM videos
WHERE source = ? AND native_id = ?
`
row := c.db.QueryRow(selectSql, source, id)
var record SyncRecord
err := row.Scan(
&record.NativeID,
&record.Title,
&record.Description,
&record.SourceURL,
&record.ReleaseTime,
&record.ThumbnailURL,
&record.FullLocalPath,
&record.ClaimID,
)
if err == sql.ErrNoRows {
log.Debugf("Data for %s:%s is not in the sync DB", source, id)
return nil, nil
} else if err != nil {
log.Errorf("Error querying video data for %s:%s from sync DB: %v", source, id, err)
return nil, err
}
if includeTags {
tags, err := c.getTags(source, id)
if err != nil {
return nil, err
}
record.Tags = &tags
}
if includeLanguages {
languages, err := c.getLanguages(source, id)
if err != nil {
return nil, err
}
record.Languages = &languages
}
return &record, nil
}
func (c *SyncDb) GetUnpublishedIDs(source string) ([]string, error) {
selectSql := `
SELECT
native_id
FROM videos
WHERE source = ? AND claim_id IS NULL
`
ids := []string{}
rows, err := c.db.Query(selectSql, source)
if err != nil {
return ids, err
}
defer rows.Close()
for rows.Next() {
var id string
err = rows.Scan(&id)
if err != nil {
return ids, err
}
ids = append(ids, id)
}
return ids, nil
}
func (c *SyncDb) GetSummary() (*SyncDbSummary, error) {
selectSql := `
SELECT
COUNT() AS total,
COUNT(v_unpub.full_local_path) AS cached_unpublished,
COUNT(v_all.claim_id) - COUNT(v_unpub.full_local_path) AS uncached_unpublished,
MAX(v_all.release_time) AS latest_known,
MAX(v_pub.release_time) AS latest_published
FROM videos v_all
LEFT JOIN videos v_pub ON v_all.source = v_pub.source AND v_all.native_id = v_pub.native_id AND v_pub.claim_id IS NOT NULL
LEFT JOIN videos v_unpub ON v_all.source = v_unpub.source AND v_all.native_id = v_unpub.native_id AND v_unpub.claim_id IS NULL
`
row := c.db.QueryRow(selectSql)
var summary SyncDbSummary
var latestKnown, latestPublished sql.NullInt64
err := row.Scan(
&summary.Total,
&summary.CachedUnpublished,
&summary.UncachedUnpublished,
&latestKnown,
&latestPublished,
)
if err != nil {
log.Errorf("Error querying sync DB summary: %v", err)
return nil, err
}
if latestKnown.Valid {
summary.LatestKnown = latestKnown.Int64
}
if latestPublished.Valid {
summary.LatestPublished = latestPublished.Int64
}
return &summary, nil
}
func (c *SyncDb) ensureSchema() error {
createSql := `
CREATE TABLE IF NOT EXISTS videos (
source TEXT,
native_id TEXT,
title TEXT,
description TEXT,
source_url TEXT,
release_time INT,
thumbnail_url TEXT,
full_local_path TEXT,
claim_name TEXT,
claim_id TEXT,
PRIMARY KEY (source, native_id)
);
CREATE TABLE IF NOT EXISTS video_tags (
source TEXT NOT NULL,
native_id TEXT NOT NULL,
tag TEXT NOT NULL,
UNIQUE (source, native_id, tag)
);
CREATE TABLE IF NOT EXISTS video_languages (
source TEXT NOT NULL,
native_id TEXT NOT NULL,
language TEXT NOT NULL,
UNIQUE (source, native_id, language)
);
`
_, err := c.db.Exec(createSql)
return err
}
func (c *SyncDb) upsertTags(source, id string, tags []string) error {
upsertSql := `
INSERT INTO video_tags (
source,
native_id,
tag
) VALUES (?, ?, ?)
ON CONFLICT (source, native_id, tag)
DO NOTHING;
`
for _, tag := range tags {
_, err := c.db.Exec(
upsertSql,
source,
id,
tag,
)
if err != nil {
log.Errorf("Error inserting tag %s into sync DB for %s:%s: %v", tag, source, id, err)
return err
}
}
return nil
}
func (c *SyncDb) getTags(source, id string) ([]string, error) {
selectSql := `
SELECT tag
FROM video_tags
WHERE source = ? AND native_id = ?;
`
rows, err := c.db.Query(selectSql, source, id)
if err != nil {
log.Errorf("Error getting tags from sync DB for %s:%s: %v", source, id, err)
return nil, err
}
defer rows.Close()
var tags []string
for rows.Next() {
var tag string
err = rows.Scan(&tag)
if err != nil {
log.Error("Error deserializing tag from sync DB for %s:%s: %v", source, id, err)
return nil, err
}
tags = append(tags, tag)
}
return tags, nil
}
func (c *SyncDb) upsertLanguages(source, id string, languages []string) error {
upsertSql := `
INSERT INTO video_languages (
source,
native_id,
language
) VALUES (?, ?, ?)
ON CONFLICT (source, native_id, language)
DO NOTHING;
`
for _, language := range languages {
_, err := c.db.Exec(
upsertSql,
source,
id,
language,
)
if err != nil {
log.Errorf("Error inserting language %s into sync DB for %s:%s: %v", language, source, id, err)
return err
}
}
return nil
}
func (c *SyncDb) getLanguages(source, id string) ([]string, error) {
selectSql := `
SELECT language
FROM video_languages
WHERE source = ? AND native_id = ?;
`
rows, err := c.db.Query(selectSql, source, id)
if err != nil {
log.Errorf("Error getting languages from sync DB for %s:%s: %v", source, id, err)
return nil, err
}
defer rows.Close()
var languages []string
for rows.Next() {
var language string
err = rows.Scan(&language)
if err != nil {
log.Error("Error deserializing language from sync DB for %s:%s: %v", source, id, err)
return nil, err
}
languages = append(languages, language)
}
return languages, nil
}
type SyncRecord struct {
Source string
NativeID string
Title sql.NullString
Description sql.NullString
SourceURL sql.NullString
ReleaseTime sql.NullInt64
ThumbnailURL sql.NullString
FullLocalPath sql.NullString
ClaimID sql.NullString
Tags *[]string
Languages *[]string
}
func SyncRecordFromSourceVideo(v SourceVideo) SyncRecord {
r := SyncRecord {
Source: v.Source,
NativeID: v.ID,
SourceURL: sql.NullString { String: v.SourceURL, Valid: true },
}
if v.Title != nil {
r.Title = sql.NullString { String: *v.Title, Valid: true }
}
if v.Description != nil {
r.Description = sql.NullString { String: *v.Description, Valid: true }
}
if v.ThumbnailURL != nil {
r.ThumbnailURL = sql.NullString { String: *v.ThumbnailURL, Valid: true }
}
if v.FullLocalPath != nil {
r.FullLocalPath = sql.NullString { String: *v.FullLocalPath, Valid: true }
}
if v.ReleaseTime != nil {
r.ReleaseTime = sql.NullInt64 { Int64: *v.ReleaseTime, Valid: true }
}
if len(v.Tags) > 0 {
r.Tags = &v.Tags
}
if len(v.Languages) > 0 {
r.Languages = &v.Languages
}
return r
}
func (r *SyncRecord) ToPublishableVideo() *PublishableVideo {
if !(r.Title.Valid &&
r.Description.Valid &&
r.SourceURL.Valid &&
r.ReleaseTime.Valid &&
r.ThumbnailURL.Valid &&
r.FullLocalPath.Valid &&
r.Tags != nil &&
r.Languages != nil) {
return nil
}
video := PublishableVideo {
ID: r.NativeID,
Source: r.Source,
Description: r.Description.String,
SourceURL: r.SourceURL.String,
ReleaseTime: r.ReleaseTime.Int64,
ThumbnailURL: r.ThumbnailURL.String,
FullLocalPath: r.FullLocalPath.String,
Tags: *r.Tags,
Languages: *r.Languages,
}
return &video
}

View file

@ -0,0 +1,51 @@
package local
type YouTubeChannelScanner interface {
Scan(sinceTimestamp int64) <-chan SourceScanIteratorResult
}
type YouTubeAPIChannelScanner struct {
api *YouTubeAPI
channel string
}
func NewYouTubeAPIChannelScanner(apiKey, channel string) (*YouTubeAPIChannelScanner) {
scanner := YouTubeAPIChannelScanner {
api: NewYouTubeAPI(apiKey),
channel: channel,
}
return &scanner
}
func (s *YouTubeAPIChannelScanner) Scan(sinceTimestamp int64) <-chan SourceScanIteratorResult {
videoCh := make(chan SourceScanIteratorResult, 10)
go func() {
defer close(videoCh)
for firstRun, nextPage := true, ""; firstRun || nextPage != ""; {
var videos []SourceVideo
var err error
firstRun = false
videos, nextPage, err = s.api.GetChannelVideosPage(s.channel, sinceTimestamp, nextPage)
if err != nil {
videoCh <- SourceScanIteratorResult {
Video: nil,
Error: err,
}
return
}
for _, video := range videos {
outVideo := video
videoCh <- SourceScanIteratorResult {
Video: &outVideo,
Error: nil,
}
}
}
}()
return videoCh
}

74
local/youtubeEnricher.go Normal file
View file

@ -0,0 +1,74 @@
package local
import (
"time"
log "github.com/sirupsen/logrus"
"github.com/lbryio/lbry.go/v2/extras/util"
)
type YouTubeVideoEnricher interface {
EnrichMissing(source *SourceVideo) error
}
type YouTubeAPIVideoEnricher struct {
api *YouTubeAPI
}
func NewYouTubeAPIVideoEnricher(apiKey string) (*YouTubeAPIVideoEnricher) {
enricher := YouTubeAPIVideoEnricher{
api: NewYouTubeAPI(apiKey),
}
return &enricher
}
func (e *YouTubeAPIVideoEnricher) EnrichMissing(source *SourceVideo) error {
if source.ReleaseTime != nil {
log.Debugf("Video %s does not need enrichment. YouTubeAPIVideoEnricher is skipping.", source.ID)
return nil
}
snippet, err := e.api.GetVideoSnippet(source.ID)
if err != nil {
log.Errorf("Error snippet data for video %s: %v", err)
return err
}
publishedAt, err := time.Parse(time.RFC3339, snippet.PublishedAt)
if err != nil {
log.Errorf("Error converting publishedAt to timestamp: %v", err)
} else {
source.ReleaseTime = util.PtrToInt64(publishedAt.Unix())
}
return nil
}
type CacheVideoEnricher struct {
syncDB *SyncDb
}
func NewCacheVideoEnricher(syncDB *SyncDb) *CacheVideoEnricher {
enricher := CacheVideoEnricher {
syncDB,
}
return &enricher
}
func (e *CacheVideoEnricher) EnrichMissing(source *SourceVideo) error {
if source.ReleaseTime != nil {
log.Debugf("Video %s does not need enrichment. YouTubeAPIVideoEnricher is skipping.", source.ID)
return nil
}
cached, err := e.syncDB.GetVideoRecord(source.Source, source.ID, false, false)
if err != nil {
log.Errorf("Error getting cached video %s: %v", source.ID, err)
return err
}
if cached != nil && cached.ReleaseTime.Valid {
source.ReleaseTime = &cached.ReleaseTime.Int64
}
return nil
}

157
local/ytapi.go Normal file
View file

@ -0,0 +1,157 @@
package local
import (
"encoding/json"
"fmt"
"io"
"net/http"
"time"
log "github.com/sirupsen/logrus"
"github.com/lbryio/lbry.go/v2/extras/util"
)
type YouTubeAPI struct {
apiKey string
client *http.Client
}
func NewYouTubeAPI(apiKey string) (*YouTubeAPI) {
client := &http.Client {
Transport: &http.Transport{
MaxIdleConns: 10,
IdleConnTimeout: 30 * time.Second,
DisableCompression: true,
},
}
api := YouTubeAPI {
apiKey: apiKey,
client: client,
}
return &api
}
func (a *YouTubeAPI) GetVideoSnippet(videoID string) (*VideoSnippet, error) {
req, err := http.NewRequest("GET", "https://youtube.googleapis.com/youtube/v3/videos", nil)
if err != nil {
log.Errorf("Error creating http client for YouTube API: %v", err)
return nil, err
}
query := req.URL.Query()
query.Add("part", "snippet")
query.Add("id", videoID)
query.Add("key", a.apiKey)
req.URL.RawQuery = query.Encode()
req.Header.Add("Accept", "application/json")
resp, err := a.client.Do(req)
defer resp.Body.Close()
if err != nil {
log.Errorf("Error from YouTube API: %v", err)
return nil, err
}
body, err := io.ReadAll(resp.Body)
log.Tracef("Response from YouTube API: %s", string(body[:]))
var result videoListResponse
err = json.Unmarshal(body, &result)
if err != nil {
log.Errorf("Error deserializing video list response from YouTube API: %v", err)
return nil, err
}
if len(result.Items) != 1 {
err = fmt.Errorf("YouTube API responded with incorrect number of snippets (%d) while attempting to get snippet data for video %s", len(result.Items), videoID)
return nil, err
}
return &result.Items[0].Snippet, nil
}
func (a *YouTubeAPI) GetChannelVideosPage(channelID string, publishedAfter int64, pageToken string) ([]SourceVideo, string, error) {
req, err := http.NewRequest("GET", "https://youtube.googleapis.com/youtube/v3/search", nil)
if err != nil {
log.Errorf("Error creating http client for YouTube API: %v", err)
return []SourceVideo{}, "", err
}
query := req.URL.Query()
query.Add("part", "snippet")
query.Add("type", "video")
query.Add("channelId", channelID)
query.Add("publishedAfter", time.Unix(publishedAfter, 0).Format(time.RFC3339))
query.Add("maxResults", "5")
if pageToken != "" {
query.Add("pageToken", pageToken)
}
query.Add("key", a.apiKey)
req.URL.RawQuery = query.Encode()
req.Header.Add("Accept", "application/json")
resp, err := a.client.Do(req)
defer resp.Body.Close()
if err != nil {
log.Errorf("Error from YouTube API: %v", err)
return []SourceVideo{}, "", err
}
body, err := io.ReadAll(resp.Body)
log.Tracef("Response from YouTube API: %s", string(body[:]))
var result videoSearchResponse
err = json.Unmarshal(body, &result)
if err != nil {
log.Errorf("Error deserializing video list response from YouTube API: %v", err)
return []SourceVideo{}, "", err
}
videos := []SourceVideo{}
for _, item := range result.Items {
var releaseTime *int64
publishedAt, err := time.Parse(time.RFC3339, item.Snippet.PublishedAt)
if err != nil {
log.Errorf("Unable to parse publish time of %s while scanning YouTube channel %s: %v", item.ID.VideoID, channelID)
releaseTime = nil
} else {
releaseTime = util.PtrToInt64(publishedAt.Unix())
}
video := SourceVideo {
ID: item.ID.VideoID,
Source: "YouTube",
ReleaseTime: releaseTime,
}
videos = append(videos, video)
}
return videos, result.NextPageToken, nil
}
type videoListResponse struct {
NextPageToken string `json:"nextPageToken"`
Items []struct {
ID string `json:"id"`
Snippet VideoSnippet `json:"snippet"`
} `json:"items"`
}
type videoSearchResponse struct {
NextPageToken string `json:"nextPageToken"`
Items []struct {
ID struct{
VideoID string `json:"videoId"`
} `json:"id"`
Snippet VideoSnippet `json:"snippet"`
} `json:"items"`
}
type VideoSnippet struct {
PublishedAt string `json:"publishedAt"`
}

239
local/ytdl.go Normal file
View file

@ -0,0 +1,239 @@
package local
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"os"
"os/exec"
"path"
"strings"
log "github.com/sirupsen/logrus"
"github.com/lbryio/ytsync/v5/downloader/ytdl"
)
type Ytdl struct {
DownloadDir string
}
func NewYtdl(downloadDir string) (*Ytdl, error) {
// TODO validate download dir
y := Ytdl {
DownloadDir: downloadDir,
}
return &y, nil
}
func (y *Ytdl) GetVideoMetadata(videoID string) (*ytdl.YtdlVideo, error) {
metadataPath, err := y.GetVideoMetadataFile(videoID)
if err != nil {
return nil, err
}
metadataBytes, err := os.ReadFile(metadataPath)
if err != nil {
return nil, err
}
var metadata *ytdl.YtdlVideo
err = json.Unmarshal(metadataBytes, &metadata)
if err != nil {
return nil, err
}
return metadata, nil
}
func (y *Ytdl) GetVideoMetadataFile(videoID string) (string, error) {
basePath := path.Join(y.DownloadDir, videoID)
metadataPath := basePath + ".info.json"
_, err := os.Stat(metadataPath)
if err != nil && !os.IsNotExist(err) {
log.Errorf("Error determining if video metadata already exists: %v", err)
return "", err
} else if err != nil {
log.Debugf("Metadata file for video %s does not exist. Downloading now.", videoID)
err = downloadVideoMetadata(basePath, videoID)
if err != nil {
return "", err
}
}
return metadataPath, nil
}
func (y *Ytdl) GetVideoFile(videoID string) (string, error) {
videoPath, err := findDownloadedVideo(y.DownloadDir, videoID)
if err != nil {
return "", err
}
if videoPath != nil {
return *videoPath, nil
}
basePath := path.Join(y.DownloadDir, videoID)
metadataPath, err := y.GetVideoMetadataFile(videoID)
if err != nil {
log.Errorf("Error getting metadata path in preparation for video download: %v", err)
return "", err
}
err = downloadVideo(basePath, metadataPath)
if err != nil {
return "", nil
}
videoPath, err = findDownloadedVideo(y.DownloadDir, videoID)
if err != nil {
log.Errorf("Error from findDownloadedVideo() after already succeeding once: %v", err)
return "", err
}
if videoPath == nil {
return "", errors.New("Could not find a downloaded video after successful download.")
}
return *videoPath, nil
}
func (y *Ytdl) DeleteVideoFiles(videoID string) error {
files, err := ioutil.ReadDir(y.DownloadDir)
if err != nil {
return err
}
for _, f := range files {
if f.IsDir() {
continue
}
if strings.Contains(f.Name(), videoID) {
videoPath := path.Join(y.DownloadDir, f.Name())
err = os.Remove(videoPath)
if err != nil {
log.Errorf("Error while deleting file %s: %v", y.DownloadDir, err)
return err
}
}
}
return nil
}
func deleteFile(path string) error {
_, err := os.Stat(path)
if err != nil && !os.IsNotExist(err) {
log.Errorf("Error determining if file %s exists: %v", path, err)
return err
} else if err != nil {
log.Debugf("File %s does not exist. Skipping deletion.", path)
return nil
}
return os.Remove(path)
}
func findDownloadedVideo(videoDir, videoID string) (*string, error) {
files, err := ioutil.ReadDir(videoDir)
if err != nil {
return nil, err
}
for _, f := range files {
if f.IsDir() {
continue
}
if path.Ext(f.Name()) == ".mp4" && strings.Contains(f.Name(), videoID) {
videoPath := path.Join(videoDir, f.Name())
return &videoPath, nil
}
}
return nil, nil
}
func downloadVideoMetadata(basePath, videoID string) error {
ytdlArgs := []string{
"--skip-download",
"--write-info-json",
"--force-overwrites",
fmt.Sprintf("https://www.youtube.com/watch?v=%s", videoID),
"--cookies",
"cookies.txt",
"-o",
basePath,
}
ytdlCmd := exec.Command("yt-dlp", ytdlArgs...)
output, err := runCmd(ytdlCmd)
log.Debug(output)
return err
}
func downloadVideo(basePath, metadataPath string) error {
ytdlArgs := []string{
"--no-progress",
"-o",
basePath,
"--merge-output-format",
"mp4",
"--postprocessor-args",
"ffmpeg:-movflags faststart",
"--abort-on-unavailable-fragment",
"--fragment-retries",
"1",
"--cookies",
"cookies.txt",
"--extractor-args",
"youtube:player_client=android",
"--load-info-json",
metadataPath,
"-fbestvideo[ext=mp4][vcodec!*=av01][height<=720]+bestaudio[ext!=webm][format_id!=258][format_id!=251][format_id!=256][format_id!=327]",
}
ytdlCmd := exec.Command("yt-dlp", ytdlArgs...)
output, err := runCmd(ytdlCmd)
log.Debug(output)
return err
}
func runCmd(cmd *exec.Cmd) ([]string, error) {
log.Infof("running cmd: %s", strings.Join(cmd.Args, " "))
var err error
stderr, err := cmd.StderrPipe()
if err != nil {
return nil, err
}
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
err = cmd.Start()
if err != nil {
return nil, err
}
outLog, err := ioutil.ReadAll(stdout)
if err != nil {
return nil, err
}
errorLog, err := ioutil.ReadAll(stderr)
if err != nil {
return nil, err
}
done := make(chan error, 1)
go func() {
done <- cmd.Wait()
}()
select {
case err := <-done:
if err != nil {
log.Error(string(errorLog))
return nil, err
}
return strings.Split(strings.Replace(string(outLog), "\r\n", "\n", -1), "\n"), nil
}
}

101
local/ytdlVideoSource.go Normal file
View file

@ -0,0 +1,101 @@
package local
import (
log "github.com/sirupsen/logrus"
"github.com/lbryio/ytsync/v5/downloader/ytdl"
)
type YtdlVideoSource struct {
downloader Ytdl
channelScanner YouTubeChannelScanner
enrichers []YouTubeVideoEnricher
}
func NewYtdlVideoSource(downloadDir string, config *YouTubeSourceConfig, syncDB *SyncDb) (*YtdlVideoSource, error) {
ytdl, err := NewYtdl(downloadDir)
if err != nil {
return nil, err
}
source := YtdlVideoSource {
downloader: *ytdl,
}
if syncDB != nil {
source.enrichers = append(source.enrichers, NewCacheVideoEnricher(syncDB))
}
if config.APIKey != "" {
ytapiEnricher := NewYouTubeAPIVideoEnricher(config.APIKey)
source.enrichers = append(source.enrichers, ytapiEnricher)
source.channelScanner = NewYouTubeAPIChannelScanner(config.APIKey, config.ChannelID)
}
if source.channelScanner == nil {
log.Warnf("No means of scanning source channels has been provided")
}
return &source, nil
}
func (s *YtdlVideoSource) SourceName() string {
return "YouTube"
}
func (s *YtdlVideoSource) GetVideo(id string) (*SourceVideo, error) {
metadata, err := s.downloader.GetVideoMetadata(id)
if err != nil {
return nil, err
}
videoPath, err := s.downloader.GetVideoFile(id)
if err != nil {
return nil, err
}
var bestThumbnail *ytdl.Thumbnail = nil
for i, thumbnail := range metadata.Thumbnails {
if i == 0 || bestThumbnail.Width < thumbnail.Width {
bestThumbnail = &thumbnail
}
}
sourceVideo := SourceVideo {
ID: id,
Source: "YouTube",
Title: &metadata.Title,
Description: &metadata.Description,
SourceURL: "\nhttps://www.youtube.com/watch?v=" + id,
Languages: []string{},
Tags: metadata.Tags,
ReleaseTime: nil,
ThumbnailURL: &bestThumbnail.URL,
FullLocalPath: &videoPath,
}
for _, enricher := range s.enrichers {
err = enricher.EnrichMissing(&sourceVideo)
if err != nil {
log.Warnf("Error enriching video %s, continuing enrichment: %v", id, err)
}
}
log.Debugf("Source video retrieved via ytdl: %v", sourceVideo)
return &sourceVideo, nil
}
func (s *YtdlVideoSource) DeleteLocalCache(id string) error {
return s.downloader.DeleteVideoFiles(id)
}
func (s *YtdlVideoSource) Scan(sinceTimestamp int64) <-chan SourceScanIteratorResult {
if s.channelScanner != nil {
return s.channelScanner.Scan(sinceTimestamp)
}
videoCh := make(chan SourceScanIteratorResult, 1)
close(videoCh)
return videoCh
}