Finish v2 (sync whole channel)

This commit is contained in:
pseudoscalar 2022-01-25 09:36:46 -06:00
parent 7489213439
commit aa16d32135
7 changed files with 747 additions and 96 deletions

View file

@ -20,6 +20,7 @@ type SyncContext struct {
DryRun bool DryRun bool
KeepCache bool KeepCache bool
ReflectStreams bool ReflectStreams bool
ForceChannelScan bool
TempDir string TempDir string
SyncDbPath string SyncDbPath string
LbrynetAddr string LbrynetAddr string
@ -75,6 +76,7 @@ func AddCommand(rootCmd *cobra.Command) {
cmd.Flags().BoolVar(&syncContext.DryRun, "dry-run", false, "Display information about the stream publishing, but do not publish the stream") cmd.Flags().BoolVar(&syncContext.DryRun, "dry-run", false, "Display information about the stream publishing, but do not publish the stream")
cmd.Flags().BoolVar(&syncContext.KeepCache, "keep-cache", false, "Don't delete local files after publishing.") cmd.Flags().BoolVar(&syncContext.KeepCache, "keep-cache", false, "Don't delete local files after publishing.")
cmd.Flags().BoolVar(&syncContext.ReflectStreams, "reflect-streams", true, "Require published streams to be reflected.") cmd.Flags().BoolVar(&syncContext.ReflectStreams, "reflect-streams", true, "Require published streams to be reflected.")
cmd.Flags().BoolVar(&syncContext.ForceChannelScan, "force-rescan", false, "Rescan channel to fill the sync DB.")
cmd.Flags().StringVar(&syncContext.TempDir, "temp-dir", getEnvDefault("TEMP_DIR", ""), "directory to use for temporary files") cmd.Flags().StringVar(&syncContext.TempDir, "temp-dir", getEnvDefault("TEMP_DIR", ""), "directory to use for temporary files")
cmd.Flags().StringVar(&syncContext.SyncDbPath, "sync-db-path", getEnvDefault("SYNC_DB_PATH", ""), "Path to the local sync DB") cmd.Flags().StringVar(&syncContext.SyncDbPath, "sync-db-path", getEnvDefault("SYNC_DB_PATH", ""), "Path to the local sync DB")
cmd.Flags().Float64Var(&syncContext.PublishBid, "publish-bid", 0.01, "Bid amount for the stream claim") cmd.Flags().Float64Var(&syncContext.PublishBid, "publish-bid", 0.01, "Bid amount for the stream claim")
@ -101,13 +103,6 @@ func localCmd(cmd *cobra.Command, args []string) {
log.Error(err) log.Error(err)
return return
} }
videoID := syncContext.VideoID
if videoID == "" {
log.Errorf("Only single video mode is supported currently. Please provided a video ID.")
return
}
log.Debugf("Running sync for video ID %s", videoID)
syncDB, err := NewSyncDb(syncContext.SyncDbPath) syncDB, err := NewSyncDb(syncContext.SyncDbPath)
if err != nil { if err != nil {
@ -116,17 +111,6 @@ func localCmd(cmd *cobra.Command, args []string) {
} }
defer syncDB.Close() defer syncDB.Close()
isSynced, claimID, err := syncDB.IsVideoPublished("YouTube", videoID)
if err != nil {
log.Errorf("Error checking if video is already synced: %v", err)
return
}
if isSynced {
log.Infof("Video %s is already published as %s.", videoID, claimID)
return
}
var publisher VideoPublisher var publisher VideoPublisher
publisher, err = NewLocalSDKPublisher(syncContext.LbrynetAddr, syncContext.ChannelID, syncContext.PublishBid) publisher, err = NewLocalSDKPublisher(syncContext.LbrynetAddr, syncContext.ChannelID, syncContext.PublishBid)
if err != nil { if err != nil {
@ -136,37 +120,121 @@ func localCmd(cmd *cobra.Command, args []string) {
var videoSource VideoSource var videoSource VideoSource
if syncContext.YouTubeSourceConfig != nil { if syncContext.YouTubeSourceConfig != nil {
videoSource, err = NewYtdlVideoSource(syncContext.TempDir, syncContext.YouTubeSourceConfig) videoSource, err = NewYtdlVideoSource(syncContext.TempDir, syncContext.YouTubeSourceConfig, syncDB)
if err != nil { if err != nil {
log.Errorf("Error setting up video source: %v", err) log.Errorf("Error setting up video source: %v", err)
return return
} }
} }
latestPublishedReleaseTime := int64(0)
latestKnownReleaseTime := int64(0)
if syncContext.ForceChannelScan {
log.Infof("Channel scan is being forced.")
} else {
dbSummary, err := syncDB.GetSummary()
if err != nil {
log.Errorf("Error getting sync DB summary for update scan: %v", err)
return
}
latestPublishedReleaseTime = dbSummary.LatestPublished
latestKnownReleaseTime = dbSummary.LatestKnown
}
log.Debugf("Latest known release time: %d", latestKnownReleaseTime)
for result := range videoSource.Scan(latestKnownReleaseTime) {
if result.Error != nil {
log.Errorf("Error while discovering new videos from source: %v", result.Error)
} else {
syncDB.SaveKnownVideo(*result.Video)
}
}
log.Debugf("Latest published release time: %d", latestPublishedReleaseTime)
for result := range publisher.PublishedVideoIterator(latestPublishedReleaseTime) {
if result.Error != nil {
log.Errorf("Error while discovering published videos: %v", result.Error)
} else {
syncDB.SavePublishedVideo(*result.Video)
}
}
var videoIDs []string
if syncContext.VideoID == "" {
videoIDs, err = syncDB.GetUnpublishedIDs(videoSource.SourceName())
if err != nil {
log.Errorf("Error getting unpublished videos from sync DB: %v", err)
return
}
} else {
videoIDs = []string{ syncContext.VideoID }
}
log.Debugf("Syncing videos: %v", videoIDs)
for _, videoID := range videoIDs {
err = syncVideo(syncContext, syncDB, videoSource, publisher, videoID) err = syncVideo(syncContext, syncDB, videoSource, publisher, videoID)
if err != nil { if err != nil {
log.Errorf("Error syncing %s: %v", videoID, err) log.Errorf("Error syncing %s: %v", videoID, err)
return return
} }
}
log.Info("Done") log.Info("Done")
} }
func syncVideo(syncContext SyncContext, syncDB *SyncDb, videoSource VideoSource, publisher VideoPublisher, videoID string) error { func cacheVideo(syncContext SyncContext, syncDB *SyncDb, videoSource VideoSource, videoID string) (*PublishableVideo, error) {
log.Debugf("Ensuring video %s:%s is cached", videoSource.SourceName(), videoID)
videoRecord, err := syncDB.GetVideoRecord(videoSource.SourceName(), videoID, true, true)
if err != nil {
log.Errorf("Error checking if video is already cached: %v", err)
return nil, err
}
if videoRecord != nil && videoRecord.FullLocalPath.Valid {
log.Debugf("%s:%s is already cached.", videoSource.SourceName(), videoID)
video := videoRecord.ToPublishableVideo()
if video == nil {
log.Warnf("%s:%s appears to be cached locally, but has missing data. Caching again.")
}
return video, nil
}
log.Debugf("%s:%s is not cached locally. Caching now.", videoSource.SourceName(), videoID)
sourceVideo, err := videoSource.GetVideo(videoID) sourceVideo, err := videoSource.GetVideo(videoID)
if err != nil { if err != nil {
log.Errorf("Error getting source video: %v", err) log.Errorf("Error getting source video: %v", err)
return err return nil, err
}
err = syncDB.SaveVideoData(*sourceVideo)
if err != nil {
log.Errorf("Error saving video data: %v", err)
return err
} }
processedVideo, err := processVideoForPublishing(*sourceVideo, syncContext.ChannelID) processedVideo, err := processVideoForPublishing(*sourceVideo, syncContext.ChannelID)
if err != nil { if err != nil {
log.Errorf("Error processing source video for publishing: %v", err) log.Errorf("Error processing source video for publishing: %v", err)
return nil, err
}
err = syncDB.SavePublishableVideo(*processedVideo)
if err != nil {
log.Errorf("Error saving video data: %v", err)
return nil, err
}
return processedVideo, nil
}
func syncVideo(syncContext SyncContext, syncDB *SyncDb, videoSource VideoSource, publisher VideoPublisher, videoID string) error {
log.Debugf("Running sync for video %s:%s", videoSource.SourceName(), videoID)
isSynced, claimID, err := syncDB.IsVideoPublished(videoSource.SourceName(), videoID)
if err != nil {
log.Errorf("Error checking if video is already synced: %v", err)
return err
}
if isSynced {
log.Infof("Video %s:%s is already published as %s.", videoSource.SourceName(), videoID, claimID)
return nil
}
processedVideo, err := cacheVideo(syncContext, syncDB, videoSource, videoID)
if err != nil {
log.Errorf("Error ensuring video is cached prior to publication: %v", err)
return err return err
} }
@ -181,7 +249,7 @@ func syncVideo(syncContext SyncContext, syncDB *SyncDb, videoSource VideoSource,
log.Errorf("Error publishing video: %v", err) log.Errorf("Error publishing video: %v", err)
return err return err
} }
err = syncDB.SaveVideoPublication(*processedVideo, claimID) err = syncDB.SavePublishedVideo((*processedVideo).ToPublished(claimID))
if err != nil { if err != nil {
// Sync DB is corrupted after getting here // Sync DB is corrupted after getting here
// and will allow double publication. // and will allow double publication.
@ -202,6 +270,11 @@ func syncVideo(syncContext SyncContext, syncDB *SyncDb, videoSource VideoSource,
if !syncContext.KeepCache { if !syncContext.KeepCache {
log.Infof("Deleting local files.") log.Infof("Deleting local files.")
err = syncDB.MarkVideoUncached(videoSource.SourceName(), videoID)
if err != nil {
log.Errorf("Error marking video %s:%s as uncached in syncDB", videoSource.SourceName(), videoID)
return err
}
err = videoSource.DeleteLocalCache(videoID) err = videoSource.DeleteLocalCache(videoID)
if err != nil { if err != nil {
log.Errorf("Error deleting local files for video %s: %v", videoID, err) log.Errorf("Error deleting local files for video %s: %v", videoID, err)
@ -222,7 +295,7 @@ type SourceVideo struct {
Tags []string Tags []string
ReleaseTime *int64 ReleaseTime *int64
ThumbnailURL *string ThumbnailURL *string
FullLocalPath string FullLocalPath *string
} }
type PublishableVideo struct { type PublishableVideo struct {
@ -239,7 +312,59 @@ type PublishableVideo struct {
FullLocalPath string FullLocalPath string
} }
func (v PublishableVideo) ToPublished(claimID string) PublishedVideo {
return PublishedVideo {
ClaimID: claimID,
NativeID: v.ID,
Source: v.Source,
ClaimName: v.ClaimName,
Title: v.Title,
Description: v.Description,
SourceURL: v.SourceURL,
Languages: v.Languages,
Tags: v.Tags,
ReleaseTime: v.ReleaseTime,
ThumbnailURL: v.ThumbnailURL,
FullLocalPath: v.FullLocalPath,
}
}
type PublishedVideo struct {
ClaimID string
NativeID string
Source string
ClaimName string
Title string
Description string
SourceURL string
Languages []string
Tags []string
ReleaseTime int64
ThumbnailURL string
FullLocalPath string
}
func (v PublishedVideo) ToPublishable() PublishableVideo {
return PublishableVideo {
ID: v.NativeID,
Source: v.Source,
ClaimName: v.ClaimName,
Title: v.Title,
Description: v.Description,
SourceURL: v.SourceURL,
Languages: v.Languages,
Tags: v.Tags,
ReleaseTime: v.ReleaseTime,
ThumbnailURL: v.ThumbnailURL,
FullLocalPath: v.FullLocalPath,
}
}
func processVideoForPublishing(source SourceVideo, channelID string) (*PublishableVideo, error) { func processVideoForPublishing(source SourceVideo, channelID string) (*PublishableVideo, error) {
if source.FullLocalPath == nil {
return nil, errors.New("Video is not cached locally")
}
tags, err := tags_manager.SanitizeTags(source.Tags, channelID) tags, err := tags_manager.SanitizeTags(source.Tags, channelID)
if err != nil { if err != nil {
log.Errorf("Error sanitizing tags: %v", err) log.Errorf("Error sanitizing tags: %v", err)
@ -289,7 +414,7 @@ func processVideoForPublishing(source SourceVideo, channelID string) (*Publishab
Tags: tags, Tags: tags,
ReleaseTime: *releaseTime, ReleaseTime: *releaseTime,
ThumbnailURL: *thumbnailURL, ThumbnailURL: *thumbnailURL,
FullLocalPath: source.FullLocalPath, FullLocalPath: *source.FullLocalPath,
} }
log.Debugf("Video prepared for publication: %v", processed) log.Debugf("Video prepared for publication: %v", processed)
@ -313,10 +438,23 @@ func getAbbrevDescription(v SourceVideo) string {
} }
type VideoSource interface { type VideoSource interface {
SourceName() string
GetVideo(id string) (*SourceVideo, error) GetVideo(id string) (*SourceVideo, error)
DeleteLocalCache(id string) error DeleteLocalCache(id string) error
Scan(sinceTimestamp int64) <-chan SourceScanIteratorResult
}
type SourceScanIteratorResult struct {
Video *SourceVideo
Error error
} }
type VideoPublisher interface { type VideoPublisher interface {
Publish(video PublishableVideo, reflectStream bool) (string, chan error, error) Publish(video PublishableVideo, reflectStream bool) (string, <-chan error, error)
PublishedVideoIterator(sinceTimestamp int64) <-chan PublishedVideoIteratorResult
}
type PublishedVideoIteratorResult struct {
Video *PublishedVideo
Error error
} }

View file

@ -48,7 +48,7 @@ func NewLocalSDKPublisher(sdkAddr, channelID string, publishBid float64) (*Local
return &publisher, nil return &publisher, nil
} }
func (p *LocalSDKPublisher) Publish(video PublishableVideo, reflectStream bool) (string, chan error, error) { func (p *LocalSDKPublisher) Publish(video PublishableVideo, reflectStream bool) (string, <-chan error, error) {
streamCreateOptions := jsonrpc.StreamCreateOptions { streamCreateOptions := jsonrpc.StreamCreateOptions {
ClaimCreateOptions: jsonrpc.ClaimCreateOptions { ClaimCreateOptions: jsonrpc.ClaimCreateOptions {
Title: &video.Title, Title: &video.Title,
@ -116,6 +116,60 @@ func (p *LocalSDKPublisher) Publish(video PublishableVideo, reflectStream bool)
return *claimID, done, nil return *claimID, done, nil
} }
func (p *LocalSDKPublisher) PublishedVideoIterator(sinceTimestamp int64) <-chan PublishedVideoIteratorResult {
videoCh := make(chan PublishedVideoIteratorResult, 10)
go func() {
defer close(videoCh)
for page := uint64(0); ; page++ {
streams, err := p.lbrynet.StreamList(nil, page, 100)
if err != nil {
log.Errorf("Error listing streams (page %d): %v", page, err)
errResult := PublishedVideoIteratorResult {
Error: err,
}
videoCh <- errResult
return
}
if len(streams.Items) == 0 {
return
}
for _, stream := range streams.Items {
if stream.ChannelID != p.channelID || stream.Value.GetStream().ReleaseTime < sinceTimestamp {
continue
}
languages := []string{}
for _, language := range stream.Value.Languages {
languages = append(languages, language.String())
}
video := PublishedVideo {
ClaimID: stream.ClaimID,
NativeID: "",
Source: "",
ClaimName: stream.Name,
Title: stream.Value.Title,
Description: stream.Value.Description,
Languages: languages,
Tags: stream.Value.Tags,
ReleaseTime: stream.Value.GetStream().ReleaseTime,
ThumbnailURL: stream.Value.Thumbnail.Url,
FullLocalPath: "",
}
videoResult := PublishedVideoIteratorResult {
Video: &video,
}
videoCh <- videoResult
}
}
}()
return videoCh
}
// if jsonrpc.Client.FileList is extended to match the actual jsonrpc schema, this can be removed // if jsonrpc.Client.FileList is extended to match the actual jsonrpc schema, this can be removed
func findFileByTxid(client *jsonrpc.Client, txid string) (*jsonrpc.FileListResponse, int, error) { func findFileByTxid(client *jsonrpc.Client, txid string) (*jsonrpc.FileListResponse, int, error) {
response, err := client.FileList(0, 20) response, err := client.FileList(0, 20)

View file

@ -10,6 +10,14 @@ type SyncDb struct {
db *sql.DB db *sql.DB
} }
type SyncDbSummary struct {
Total int
CachedUnpublished int
UncachedUnpublished int
LatestKnown int64
LatestPublished int64
}
func NewSyncDb(path string) (*SyncDb, error) { func NewSyncDb(path string) (*SyncDb, error) {
db, err := sql.Open("sqlite3", path) db, err := sql.Open("sqlite3", path)
if err != nil { if err != nil {
@ -33,7 +41,157 @@ func (c *SyncDb) Close() error {
return c.db.Close() return c.db.Close()
} }
func (c *SyncDb) SaveVideoData(video SourceVideo) error { func (c *SyncDb) SaveKnownVideo(video SourceVideo) error {
if video.ID == "" {
log.Warnf("Trying to save a video with no ID: %v", video)
}
insertSql := `
INSERT INTO videos (
source,
native_id,
title,
description,
source_url,
release_time,
thumbnail_url,
full_local_path
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (source, native_id)
DO NOTHING;
`
r := SyncRecordFromSourceVideo(video)
_, err := c.db.Exec(
insertSql,
r.Source,
r.NativeID,
r.Title,
r.Description,
r.SourceURL,
r.ReleaseTime,
r.ThumbnailURL,
r.FullLocalPath,
)
return err
}
func (c *SyncDb) SavePublishableVideo(video PublishableVideo) error {
upsertSql := `
INSERT INTO videos (
source,
native_id,
claim_name,
title,
description,
source_url,
release_time,
thumbnail_url,
full_local_path
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (source, native_id)
DO UPDATE SET
claim_name = excluded.claim_name,
title = excluded.title,
description = excluded.description,
source_url = excluded.source_url,
release_time = excluded.release_time,
thumbnail_url = excluded.thumbnail_url,
full_local_path = excluded.full_local_path;
`
_, err := c.db.Exec(
upsertSql,
video.Source,
video.ID,
video.ClaimName,
video.Title,
video.Description,
video.SourceURL,
video.ReleaseTime,
video.ThumbnailURL,
video.FullLocalPath,
)
if err != nil {
return err
}
err = c.upsertTags(video.Source, video.ID, video.Tags)
if err != nil {
return err
}
err = c.upsertLanguages(video.Source, video.ID, video.Languages)
return err
}
func (c *SyncDb) SavePublishedVideo(video PublishedVideo) error {
upsertSql := `
INSERT INTO videos (
source,
native_id,
claim_id,
claim_name,
title,
description,
source_url,
release_time,
thumbnail_url,
full_local_path
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (source, native_id)
DO UPDATE SET
claim_id = excluded.claim_id,
claim_name = excluded.claim_name,
title = excluded.title,
description = excluded.description,
source_url = excluded.source_url,
release_time = excluded.release_time,
thumbnail_url = excluded.thumbnail_url,
full_local_path = excluded.full_local_path;
`
_, err := c.db.Exec(
upsertSql,
video.Source,
video.NativeID,
video.ClaimID,
video.ClaimName,
video.Title,
video.Description,
video.SourceURL,
video.ReleaseTime,
video.ThumbnailURL,
video.FullLocalPath,
)
if err != nil {
return err
}
err = c.upsertTags(video.Source, video.NativeID, video.Tags)
if err != nil {
return err
}
err = c.upsertLanguages(video.Source, video.NativeID, video.Languages)
return err
}
func (c *SyncDb) MarkVideoUncached(source, id string) error {
updateSql := `
UPDATE videos
SET full_local_path = NULL
WHERE source = ? AND native_id = ?;
`
_, err := c.db.Exec(
updateSql,
source,
id,
)
return err
}
func (c *SyncDb) _SaveVideoData(video SourceVideo) error {
upsertSql := ` upsertSql := `
INSERT INTO videos ( INSERT INTO videos (
source, source,
@ -152,7 +310,33 @@ WHERE source = ? AND native_id = ?
} }
} }
func (c *SyncDb) GetSavedVideoData(source, id string) (*SourceVideo, *string, error) { func (c *SyncDb) IsVideoCached(source, id string) (bool, string, error) {
selectSql := `
SELECT
full_local_path
FROM videos
WHERE source = ? AND native_id = ?
`
row := c.db.QueryRow(selectSql, source, id)
var localPath sql.NullString
err := row.Scan(&localPath)
if err == sql.ErrNoRows {
return false, "", nil
} else if err != nil {
log.Errorf("Error querying video cache status for %s:%s from sync DB: %v", source, id, err)
return false, "", err
}
if localPath.Valid {
return true, localPath.String, nil
} else {
return false, "", nil
}
}
func (c *SyncDb) GetVideoRecord(source, id string, includeTags, includeLanguages bool) (*SyncRecord, error) {
selectSql := ` selectSql := `
SELECT SELECT
native_id, native_id,
@ -168,40 +352,107 @@ WHERE source = ? AND native_id = ?
` `
row := c.db.QueryRow(selectSql, source, id) row := c.db.QueryRow(selectSql, source, id)
var record syncRecord var record SyncRecord
err := row.Scan( err := row.Scan(
&record.nativeID, &record.NativeID,
&record.title, &record.Title,
&record.description, &record.Description,
&record.sourceURL, &record.SourceURL,
&record.releaseTime, &record.ReleaseTime,
&record.thumbnailURL, &record.ThumbnailURL,
&record.fullLocalPath, &record.FullLocalPath,
&record.claimID, &record.ClaimID,
) )
if err == sql.ErrNoRows { if err == sql.ErrNoRows {
log.Debugf("Data for YouTube:%s is not in the sync DB", id) log.Debugf("Data for %s:%s is not in the sync DB", source, id)
return nil, nil, nil return nil, nil
} else if err != nil { } else if err != nil {
log.Errorf("Error querying video data for %s:%s from sync DB: %v", source, id, err) log.Errorf("Error querying video data for %s:%s from sync DB: %v", source, id, err)
return nil, nil, err return nil, err
} }
sourceVideo, claimID := record.toSourceVideo()
if includeTags {
tags, err := c.getTags(source, id) tags, err := c.getTags(source, id)
if err != nil { if err != nil {
return nil, nil, err return nil, err
}
record.Tags = &tags
} }
if includeLanguages {
languages, err := c.getLanguages(source, id) languages, err := c.getLanguages(source, id)
if err != nil { if err != nil {
return nil, nil, err return nil, err
}
record.Languages = &languages
} }
sourceVideo.Tags = tags return &record, nil
sourceVideo.Languages = languages }
return &sourceVideo, claimID, nil func (c *SyncDb) GetUnpublishedIDs(source string) ([]string, error) {
selectSql := `
SELECT
native_id
FROM videos
WHERE source = ? AND claim_id IS NULL
`
ids := []string{}
rows, err := c.db.Query(selectSql, source)
if err != nil {
return ids, err
}
defer rows.Close()
for rows.Next() {
var id string
err = rows.Scan(&id)
if err != nil {
return ids, err
}
ids = append(ids, id)
}
return ids, nil
}
func (c *SyncDb) GetSummary() (*SyncDbSummary, error) {
selectSql := `
SELECT
COUNT() AS total,
COUNT(v_unpub.full_local_path) AS cached_unpublished,
COUNT(v_all.claim_id) - COUNT(v_unpub.full_local_path) AS uncached_unpublished,
MAX(v_all.release_time) AS latest_known,
MAX(v_pub.release_time) AS latest_published
FROM videos v_all
LEFT JOIN videos v_pub ON v_all.source = v_pub.source AND v_all.native_id = v_pub.native_id AND v_pub.claim_id IS NOT NULL
LEFT JOIN videos v_unpub ON v_all.source = v_unpub.source AND v_all.native_id = v_unpub.native_id AND v_unpub.claim_id IS NULL
`
row := c.db.QueryRow(selectSql)
var summary SyncDbSummary
var latestKnown, latestPublished sql.NullInt64
err := row.Scan(
&summary.Total,
&summary.CachedUnpublished,
&summary.UncachedUnpublished,
&latestKnown,
&latestPublished,
)
if err != nil {
log.Errorf("Error querying sync DB summary: %v", err)
return nil, err
}
if latestKnown.Valid {
summary.LatestKnown = latestKnown.Int64
}
if latestPublished.Valid {
summary.LatestPublished = latestPublished.Int64
}
return &summary, nil
} }
func (c *SyncDb) ensureSchema() error { func (c *SyncDb) ensureSchema() error {
@ -215,6 +466,7 @@ CREATE TABLE IF NOT EXISTS videos (
release_time INT, release_time INT,
thumbnail_url TEXT, thumbnail_url TEXT,
full_local_path TEXT, full_local_path TEXT,
claim_name TEXT,
claim_id TEXT, claim_id TEXT,
PRIMARY KEY (source, native_id) PRIMARY KEY (source, native_id)
); );
@ -343,53 +595,82 @@ WHERE source = ? AND native_id = ?;
return languages, nil return languages, nil
} }
type syncRecord struct { type SyncRecord struct {
source string Source string
nativeID string NativeID string
title sql.NullString Title sql.NullString
description sql.NullString Description sql.NullString
sourceURL string SourceURL sql.NullString
releaseTime sql.NullInt64 ReleaseTime sql.NullInt64
thumbnailURL sql.NullString ThumbnailURL sql.NullString
fullLocalPath string FullLocalPath sql.NullString
claimID sql.NullString ClaimID sql.NullString
Tags *[]string
Languages *[]string
} }
func (r *syncRecord) toSourceVideo() (SourceVideo, *string) { func SyncRecordFromSourceVideo(v SourceVideo) SyncRecord {
video := SourceVideo { r := SyncRecord {
ID: r.nativeID, Source: v.Source,
Source: r.source, NativeID: v.ID,
SourceURL: r.sourceURL, SourceURL: sql.NullString { String: v.SourceURL, Valid: true },
FullLocalPath: r.fullLocalPath,
} }
if r.title.Valid { if v.Title != nil {
video.Title = &r.title.String r.Title = sql.NullString { String: *v.Title, Valid: true }
} else {
video.Title = nil
} }
if r.description.Valid { if v.Description != nil {
video.Description = &r.description.String r.Description = sql.NullString { String: *v.Description, Valid: true }
} else {
video.Description = nil
} }
if r.releaseTime.Valid { if v.ThumbnailURL != nil {
video.ReleaseTime = &r.releaseTime.Int64 r.ThumbnailURL = sql.NullString { String: *v.ThumbnailURL, Valid: true }
} else {
video.ReleaseTime = nil
} }
if r.thumbnailURL.Valid { if v.FullLocalPath != nil {
video.ThumbnailURL = &r.thumbnailURL.String r.FullLocalPath = sql.NullString { String: *v.FullLocalPath, Valid: true }
} else {
video.ThumbnailURL = nil
} }
if r.claimID.Valid { if v.ReleaseTime != nil {
return video, &r.claimID.String r.ReleaseTime = sql.NullInt64 { Int64: *v.ReleaseTime, Valid: true }
} else {
return video, nil
} }
if len(v.Tags) > 0 {
r.Tags = &v.Tags
}
if len(v.Languages) > 0 {
r.Languages = &v.Languages
}
return r
}
func (r *SyncRecord) ToPublishableVideo() *PublishableVideo {
if !(r.Title.Valid &&
r.Description.Valid &&
r.SourceURL.Valid &&
r.ReleaseTime.Valid &&
r.ThumbnailURL.Valid &&
r.FullLocalPath.Valid &&
r.Tags != nil &&
r.Languages != nil) {
return nil
}
video := PublishableVideo {
ID: r.NativeID,
Source: r.Source,
Description: r.Description.String,
SourceURL: r.SourceURL.String,
ReleaseTime: r.ReleaseTime.Int64,
ThumbnailURL: r.ThumbnailURL.String,
FullLocalPath: r.FullLocalPath.String,
Tags: *r.Tags,
Languages: *r.Languages,
}
return &video
} }

View file

@ -0,0 +1,51 @@
package local
type YouTubeChannelScanner interface {
Scan(sinceTimestamp int64) <-chan SourceScanIteratorResult
}
type YouTubeAPIChannelScanner struct {
api *YouTubeAPI
channel string
}
func NewYouTubeAPIChannelScanner(apiKey, channel string) (*YouTubeAPIChannelScanner) {
scanner := YouTubeAPIChannelScanner {
api: NewYouTubeAPI(apiKey),
channel: channel,
}
return &scanner
}
func (s *YouTubeAPIChannelScanner) Scan(sinceTimestamp int64) <-chan SourceScanIteratorResult {
videoCh := make(chan SourceScanIteratorResult, 10)
go func() {
defer close(videoCh)
for firstRun, nextPage := true, ""; firstRun || nextPage != ""; {
var videos []SourceVideo
var err error
firstRun = false
videos, nextPage, err = s.api.GetChannelVideosPage(s.channel, sinceTimestamp, nextPage)
if err != nil {
videoCh <- SourceScanIteratorResult {
Video: nil,
Error: err,
}
return
}
for _, video := range videos {
outVideo := video
videoCh <- SourceScanIteratorResult {
Video: &outVideo,
Error: nil,
}
}
}
}()
return videoCh
}

View file

@ -43,3 +43,32 @@ func (e *YouTubeAPIVideoEnricher) EnrichMissing(source *SourceVideo) error {
} }
return nil return nil
} }
type CacheVideoEnricher struct {
syncDB *SyncDb
}
func NewCacheVideoEnricher(syncDB *SyncDb) *CacheVideoEnricher {
enricher := CacheVideoEnricher {
syncDB,
}
return &enricher
}
func (e *CacheVideoEnricher) EnrichMissing(source *SourceVideo) error {
if source.ReleaseTime != nil {
log.Debugf("Video %s does not need enrichment. YouTubeAPIVideoEnricher is skipping.", source.ID)
return nil
}
cached, err := e.syncDB.GetVideoRecord(source.Source, source.ID, false, false)
if err != nil {
log.Errorf("Error getting cached video %s: %v", source.ID, err)
return err
}
if cached != nil && cached.ReleaseTime.Valid {
source.ReleaseTime = &cached.ReleaseTime.Int64
}
return nil
}

View file

@ -8,6 +8,8 @@ import (
"time" "time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/lbryio/lbry.go/v2/extras/util"
) )
type YouTubeAPI struct { type YouTubeAPI struct {
@ -72,8 +74,80 @@ func (a *YouTubeAPI) GetVideoSnippet(videoID string) (*VideoSnippet, error) {
return &result.Items[0].Snippet, nil return &result.Items[0].Snippet, nil
} }
func (a *YouTubeAPI) GetChannelVideosPage(channelID string, publishedAfter int64, pageToken string) ([]SourceVideo, string, error) {
req, err := http.NewRequest("GET", "https://youtube.googleapis.com/youtube/v3/search", nil)
if err != nil {
log.Errorf("Error creating http client for YouTube API: %v", err)
return []SourceVideo{}, "", err
}
query := req.URL.Query()
query.Add("part", "snippet")
query.Add("type", "video")
query.Add("channelId", channelID)
query.Add("publishedAfter", time.Unix(publishedAfter, 0).Format(time.RFC3339))
query.Add("maxResults", "5")
if pageToken != "" {
query.Add("pageToken", pageToken)
}
query.Add("key", a.apiKey)
req.URL.RawQuery = query.Encode()
req.Header.Add("Accept", "application/json")
resp, err := a.client.Do(req)
defer resp.Body.Close()
if err != nil {
log.Errorf("Error from YouTube API: %v", err)
return []SourceVideo{}, "", err
}
body, err := io.ReadAll(resp.Body)
log.Tracef("Response from YouTube API: %s", string(body[:]))
var result videoSearchResponse
err = json.Unmarshal(body, &result)
if err != nil {
log.Errorf("Error deserializing video list response from YouTube API: %v", err)
return []SourceVideo{}, "", err
}
videos := []SourceVideo{}
for _, item := range result.Items {
var releaseTime *int64
publishedAt, err := time.Parse(time.RFC3339, item.Snippet.PublishedAt)
if err != nil {
log.Errorf("Unable to parse publish time of %s while scanning YouTube channel %s: %v", item.ID.VideoID, channelID)
releaseTime = nil
} else {
releaseTime = util.PtrToInt64(publishedAt.Unix())
}
video := SourceVideo {
ID: item.ID.VideoID,
Source: "YouTube",
ReleaseTime: releaseTime,
}
videos = append(videos, video)
}
return videos, result.NextPageToken, nil
}
type videoListResponse struct { type videoListResponse struct {
NextPageToken string `json:"nextPageToken"`
Items []struct { Items []struct {
ID string `json:"id"`
Snippet VideoSnippet `json:"snippet"`
} `json:"items"`
}
type videoSearchResponse struct {
NextPageToken string `json:"nextPageToken"`
Items []struct {
ID struct{
VideoID string `json:"videoId"`
} `json:"id"`
Snippet VideoSnippet `json:"snippet"` Snippet VideoSnippet `json:"snippet"`
} `json:"items"` } `json:"items"`
} }

View file

@ -8,10 +8,11 @@ import (
type YtdlVideoSource struct { type YtdlVideoSource struct {
downloader Ytdl downloader Ytdl
channelScanner YouTubeChannelScanner
enrichers []YouTubeVideoEnricher enrichers []YouTubeVideoEnricher
} }
func NewYtdlVideoSource(downloadDir string, config *YouTubeSourceConfig) (*YtdlVideoSource, error) { func NewYtdlVideoSource(downloadDir string, config *YouTubeSourceConfig, syncDB *SyncDb) (*YtdlVideoSource, error) {
ytdl, err := NewYtdl(downloadDir) ytdl, err := NewYtdl(downloadDir)
if err != nil { if err != nil {
return nil, err return nil, err
@ -21,14 +22,27 @@ func NewYtdlVideoSource(downloadDir string, config *YouTubeSourceConfig) (*YtdlV
downloader: *ytdl, downloader: *ytdl,
} }
if syncDB != nil {
source.enrichers = append(source.enrichers, NewCacheVideoEnricher(syncDB))
}
if config.APIKey != "" { if config.APIKey != "" {
ytapiEnricher := NewYouTubeAPIVideoEnricher(config.APIKey) ytapiEnricher := NewYouTubeAPIVideoEnricher(config.APIKey)
source.enrichers = append(source.enrichers, ytapiEnricher) source.enrichers = append(source.enrichers, ytapiEnricher)
source.channelScanner = NewYouTubeAPIChannelScanner(config.APIKey, config.ChannelID)
}
if source.channelScanner == nil {
log.Warnf("No means of scanning source channels has been provided")
} }
return &source, nil return &source, nil
} }
func (s *YtdlVideoSource) SourceName() string {
return "YouTube"
}
func (s *YtdlVideoSource) GetVideo(id string) (*SourceVideo, error) { func (s *YtdlVideoSource) GetVideo(id string) (*SourceVideo, error) {
metadata, err := s.downloader.GetVideoMetadata(id) metadata, err := s.downloader.GetVideoMetadata(id)
if err != nil { if err != nil {
@ -57,7 +71,7 @@ func (s *YtdlVideoSource) GetVideo(id string) (*SourceVideo, error) {
Tags: metadata.Tags, Tags: metadata.Tags,
ReleaseTime: nil, ReleaseTime: nil,
ThumbnailURL: &bestThumbnail.URL, ThumbnailURL: &bestThumbnail.URL,
FullLocalPath: videoPath, FullLocalPath: &videoPath,
} }
for _, enricher := range s.enrichers { for _, enricher := range s.enrichers {
@ -75,3 +89,13 @@ func (s *YtdlVideoSource) GetVideo(id string) (*SourceVideo, error) {
func (s *YtdlVideoSource) DeleteLocalCache(id string) error { func (s *YtdlVideoSource) DeleteLocalCache(id string) error {
return s.downloader.DeleteVideoFiles(id) return s.downloader.DeleteVideoFiles(id)
} }
func (s *YtdlVideoSource) Scan(sinceTimestamp int64) <-chan SourceScanIteratorResult {
if s.channelScanner != nil {
return s.channelScanner.Scan(sinceTimestamp)
}
videoCh := make(chan SourceScanIteratorResult, 1)
close(videoCh)
return videoCh
}