diff --git a/go.mod b/go.mod index 40b2c65..0c2cdac 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/kr/pretty v0.2.1 // indirect github.com/lbryio/lbry.go/v2 v2.7.2-0.20210824154606-3e18b74da08b github.com/lbryio/reflector.go v1.1.3-0.20210412225256-4392c9724262 + github.com/mattn/go-sqlite3 v1.10.0 github.com/miekg/dns v1.1.22 // indirect github.com/mitchellh/go-ps v0.0.0-20190716172923-621e5597135b github.com/opencontainers/go-digest v1.0.0-rc1 // indirect diff --git a/go.sum b/go.sum index 3b1c9d2..1d40796 100644 --- a/go.sum +++ b/go.sum @@ -318,6 +318,7 @@ github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaO github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU= github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/mattn/go-sqlite3 v1.10.0 h1:jbhqpg7tQe4SupckyijYiy0mJJ/pRyHvXf7JdWK860o= github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= diff --git a/local/local.go b/local/local.go index 9b7837e..afd2d4f 100644 --- a/local/local.go +++ b/local/local.go @@ -21,6 +21,7 @@ type SyncContext struct { KeepCache bool ReflectStreams bool TempDir string + SyncDbPath string LbrynetAddr string ChannelID string PublishBid float64 @@ -31,6 +32,9 @@ func (c *SyncContext) Validate() error { if c.TempDir == "" { return errors.New("No TempDir provided") } + if c.SyncDbPath == "" { + return errors.New("No sync DB path provided") + } if c.LbrynetAddr == "" { return errors.New("No Lbrynet address provided") } @@ -60,6 +64,7 @@ func AddCommand(rootCmd *cobra.Command) { cmd.Flags().BoolVar(&syncContext.KeepCache, "keep-cache", false, "Don't delete local files after publishing.") cmd.Flags().BoolVar(&syncContext.ReflectStreams, "reflect-streams", true, "Require published streams to be reflected.") cmd.Flags().StringVar(&syncContext.TempDir, "temp-dir", getEnvDefault("TEMP_DIR", ""), "directory to use for temporary files") + cmd.Flags().StringVar(&syncContext.SyncDbPath, "sync-db-path", getEnvDefault("SYNC_DB_PATH", ""), "Path to the local sync DB") cmd.Flags().Float64Var(&syncContext.PublishBid, "publish-bid", 0.01, "Bid amount for the stream claim") cmd.Flags().StringVar(&syncContext.LbrynetAddr, "lbrynet-address", getEnvDefault("LBRYNET_ADDRESS", ""), "JSONRPC address of the local LBRYNet daemon") cmd.Flags().StringVar(&syncContext.ChannelID, "channel-id", "", "LBRY channel ID to publish to") @@ -87,6 +92,24 @@ func localCmd(cmd *cobra.Command, args []string) { log.Debugf("Running sync for video ID %s", videoID) + syncDB, err := NewSyncDb(syncContext.SyncDbPath) + if err != nil { + log.Errorf("Error creating sync DB: %v", err) + return + } + defer syncDB.Close() + + isSynced, claimID, err := syncDB.IsVideoPublished("YouTube", videoID) + if err != nil { + log.Errorf("Error checking if video is already synced: %v", err) + return + } + + if isSynced { + log.Infof("Video %s is already published as %s.", videoID, claimID) + return + } + var publisher VideoPublisher publisher, err = NewLocalSDKPublisher(syncContext.LbrynetAddr, syncContext.ChannelID, syncContext.PublishBid) if err != nil { @@ -109,6 +132,12 @@ func localCmd(cmd *cobra.Command, args []string) { return } + err = syncDB.SaveVideoData(*sourceVideo) + if err != nil { + log.Errorf("Error saving video data: %v", err) + return + } + processedVideo, err := processVideoForPublishing(*sourceVideo, syncContext.ChannelID) if err != nil { log.Errorf("Error processing source video for publishing: %v", err) @@ -121,11 +150,18 @@ func localCmd(cmd *cobra.Command, args []string) { log.Debugf("Object to be published: %v", processedVideo) } else { - doneReflectingCh, err := publisher.Publish(*processedVideo, syncContext.ReflectStreams) + claimID, doneReflectingCh, err := publisher.Publish(*processedVideo, syncContext.ReflectStreams) if err != nil { log.Errorf("Error publishing video: %v", err) return } + err = syncDB.SaveVideoPublication(*processedVideo, claimID) + if err != nil { + // Sync DB is corrupted after getting here + // and will allow double publication. + log.Errorf("Error saving video publication to sync DB: %v", err) + return + } if syncContext.ReflectStreams { err = <-doneReflectingCh @@ -149,6 +185,7 @@ func localCmd(cmd *cobra.Command, args []string) { type SourceVideo struct { ID string + Source string Title *string Description *string SourceURL string @@ -161,6 +198,7 @@ type SourceVideo struct { type PublishableVideo struct { ID string + Source string ClaimName string Title string Description string @@ -213,6 +251,8 @@ func processVideoForPublishing(source SourceVideo, channelID string) (*Publishab } processed := PublishableVideo { + ID: source.ID, + Source: source.Source, ClaimName: claimName, Title: title, Description: getAbbrevDescription(source), @@ -249,5 +289,5 @@ type VideoSource interface { } type VideoPublisher interface { - Publish(video PublishableVideo, reflectStream bool) (chan error, error) + Publish(video PublishableVideo, reflectStream bool) (string, chan error, error) } diff --git a/local/localSDKPublisher.go b/local/localSDKPublisher.go index 3886eb9..c27a721 100644 --- a/local/localSDKPublisher.go +++ b/local/localSDKPublisher.go @@ -48,7 +48,7 @@ func NewLocalSDKPublisher(sdkAddr, channelID string, publishBid float64) (*Local return &publisher, nil } -func (p *LocalSDKPublisher) Publish(video PublishableVideo, reflectStream bool) (chan error, error) { +func (p *LocalSDKPublisher) Publish(video PublishableVideo, reflectStream bool) (string, chan error, error) { streamCreateOptions := jsonrpc.StreamCreateOptions { ClaimCreateOptions: jsonrpc.ClaimCreateOptions { Title: &video.Title, @@ -64,11 +64,22 @@ func (p *LocalSDKPublisher) Publish(video PublishableVideo, reflectStream bool) txSummary, err := p.lbrynet.StreamCreate(video.ClaimName, video.FullLocalPath, p.publishBid, streamCreateOptions) if err != nil { - return nil, err + return "", nil, err + } + + var claimID *string + for _, output := range txSummary.Outputs { + if output.Type == "claim" { + claimID = &output.ClaimID + break + } + } + if claimID == nil { + return "", nil, errors.New("Publish transaction did not have a claim output.") } if !reflectStream { - return nil, nil + return "", nil, nil } done := make(chan error, 1) @@ -102,7 +113,7 @@ func (p *LocalSDKPublisher) Publish(video PublishableVideo, reflectStream bool) done <- nil }() - return done, nil + return *claimID, done, nil } // if jsonrpc.Client.FileList is extended to match the actual jsonrpc schema, this can be removed diff --git a/local/syncDb.go b/local/syncDb.go new file mode 100644 index 0000000..34b3eb0 --- /dev/null +++ b/local/syncDb.go @@ -0,0 +1,395 @@ +package local + +import ( + "database/sql" + _ "github.com/mattn/go-sqlite3" + log "github.com/sirupsen/logrus" +) + +type SyncDb struct { + db *sql.DB +} + +func NewSyncDb(path string) (*SyncDb, error) { + db, err := sql.Open("sqlite3", path) + if err != nil { + log.Errorf("Error opening cache DB at %s: %v", path, err) + return nil, err + } + + cache := SyncDb { + db: db, + } + err = cache.ensureSchema() + if err != nil { + log.Errorf("Error while ensuring sync DB structure: %v", err) + return nil, err + } + + return &cache, nil +} + +func (c *SyncDb) Close() error { + return c.db.Close() +} + +func (c *SyncDb) SaveVideoData(video SourceVideo) error { + upsertSql := ` +INSERT INTO videos ( + source, + native_id, + title, + description, + source_url, + release_time, + thumbnail_url, + full_local_path +) VALUES (?, ?, ?, ?, ?, ?, ?, ?) +ON CONFLICT (source, native_id) +DO UPDATE SET + title = excluded.title, + description = excluded.description, + source_url = excluded.source_url, + release_time = excluded.release_time, + thumbnail_url = excluded.thumbnail_url, + full_local_path = excluded.full_local_path; + ` + _, err := c.db.Exec( + upsertSql, + video.Source, + video.ID, + video.Title, + video.Description, + video.SourceURL, + video.ReleaseTime, + video.ThumbnailURL, + video.FullLocalPath, + ) + if err != nil { + return err + } + + err = c.upsertTags(video.Source, video.ID, video.Tags) + if err != nil { + return err + } + + err = c.upsertLanguages(video.Source, video.ID, video.Languages) + return err +} + +func (c *SyncDb) SaveVideoPublication(video PublishableVideo, claimID string) error { + upsertSql := ` +INSERT INTO videos ( + source, + native_id, + title, + description, + source_url, + release_time, + thumbnail_url, + full_local_path, + claim_id +) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) +ON CONFLICT (source, native_id) +DO UPDATE SET + title = excluded.title, + description = excluded.description, + source_url = excluded.source_url, + release_time = excluded.release_time, + thumbnail_url = excluded.thumbnail_url, + full_local_path = excluded.full_local_path, + claim_id = excluded.claim_id; + ` + _, err := c.db.Exec( + upsertSql, + video.Source, + video.ID, + video.Title, + video.Description, + video.SourceURL, + video.ReleaseTime, + video.ThumbnailURL, + video.FullLocalPath, + claimID, + ) + if err != nil { + return err + } + + err = c.upsertTags(video.Source, video.ID, video.Tags) + if err != nil { + return err + } + + err = c.upsertLanguages(video.Source, video.ID, video.Languages) + return err +} + +func (c *SyncDb) IsVideoPublished(source, id string) (bool, string, error) { + selectSql := ` +SELECT + claim_id +FROM videos +WHERE source = ? AND native_id = ? + ` + row := c.db.QueryRow(selectSql, source, id) + + var claimID sql.NullString + err := row.Scan(&claimID) + + if err == sql.ErrNoRows { + return false, "", nil + } else if err != nil { + log.Errorf("Error querying video publication for %s:%s from sync DB: %v", source, id, err) + return false, "", err + } + + if claimID.Valid { + return true, claimID.String, nil + } else { + return false, "", nil + } +} + +func (c *SyncDb) GetSavedVideoData(source, id string) (*SourceVideo, *string, error) { + selectSql := ` +SELECT + native_id, + title, + description, + source_url, + release_time, + thumbnail_url, + full_local_path, + claim_id +FROM videos +WHERE source = ? AND native_id = ? + ` + row := c.db.QueryRow(selectSql, source, id) + + var record syncRecord + err := row.Scan( + &record.nativeID, + &record.title, + &record.description, + &record.sourceURL, + &record.releaseTime, + &record.thumbnailURL, + &record.fullLocalPath, + &record.claimID, + ) + if err == sql.ErrNoRows { + log.Debugf("Data for YouTube:%s is not in the sync DB", id) + return nil, nil, nil + } else if err != nil { + log.Errorf("Error querying video data for %s:%s from sync DB: %v", source, id, err) + return nil, nil, err + } + sourceVideo, claimID := record.toSourceVideo() + + tags, err := c.getTags(source, id) + if err != nil { + return nil, nil, err + } + + languages, err := c.getLanguages(source, id) + if err != nil { + return nil, nil, err + } + + sourceVideo.Tags = tags + sourceVideo.Languages = languages + + return &sourceVideo, claimID, nil +} + +func (c *SyncDb) ensureSchema() error { + createSql := ` +CREATE TABLE IF NOT EXISTS videos ( + source TEXT, + native_id TEXT, + title TEXT, + description TEXT, + source_url TEXT, + release_time INT, + thumbnail_url TEXT, + full_local_path TEXT, + claim_id TEXT, + PRIMARY KEY (source, native_id) +); +CREATE TABLE IF NOT EXISTS video_tags ( + source TEXT NOT NULL, + native_id TEXT NOT NULL, + tag TEXT NOT NULL, + UNIQUE (source, native_id, tag) +); +CREATE TABLE IF NOT EXISTS video_languages ( + source TEXT NOT NULL, + native_id TEXT NOT NULL, + language TEXT NOT NULL, + UNIQUE (source, native_id, language) +); + ` + _, err := c.db.Exec(createSql) + return err +} + +func (c *SyncDb) upsertTags(source, id string, tags []string) error { + upsertSql := ` +INSERT INTO video_tags ( + source, + native_id, + tag +) VALUES (?, ?, ?) +ON CONFLICT (source, native_id, tag) +DO NOTHING; + ` + + for _, tag := range tags { + _, err := c.db.Exec( + upsertSql, + source, + id, + tag, + ) + if err != nil { + log.Errorf("Error inserting tag %s into sync DB for %s:%s: %v", tag, source, id, err) + return err + } + } + return nil +} + +func (c *SyncDb) getTags(source, id string) ([]string, error) { + selectSql := ` +SELECT tag +FROM video_tags +WHERE source = ? AND native_id = ?; + ` + + rows, err := c.db.Query(selectSql, source, id) + if err != nil { + log.Errorf("Error getting tags from sync DB for %s:%s: %v", source, id, err) + return nil, err + } + defer rows.Close() + + var tags []string + for rows.Next() { + var tag string + err = rows.Scan(&tag) + if err != nil { + log.Error("Error deserializing tag from sync DB for %s:%s: %v", source, id, err) + return nil, err + } + tags = append(tags, tag) + } + + return tags, nil +} + +func (c *SyncDb) upsertLanguages(source, id string, languages []string) error { + upsertSql := ` +INSERT INTO video_languages ( + source, + native_id, + language +) VALUES (?, ?, ?) +ON CONFLICT (source, native_id, language) +DO NOTHING; + ` + + for _, language := range languages { + _, err := c.db.Exec( + upsertSql, + source, + id, + language, + ) + if err != nil { + log.Errorf("Error inserting language %s into sync DB for %s:%s: %v", language, source, id, err) + return err + } + } + return nil +} + +func (c *SyncDb) getLanguages(source, id string) ([]string, error) { + selectSql := ` +SELECT language +FROM video_languages +WHERE source = ? AND native_id = ?; + ` + + rows, err := c.db.Query(selectSql, source, id) + if err != nil { + log.Errorf("Error getting languages from sync DB for %s:%s: %v", source, id, err) + return nil, err + } + defer rows.Close() + + var languages []string + for rows.Next() { + var language string + err = rows.Scan(&language) + if err != nil { + log.Error("Error deserializing language from sync DB for %s:%s: %v", source, id, err) + return nil, err + } + languages = append(languages, language) + } + + return languages, nil +} + +type syncRecord struct { + source string + nativeID string + title sql.NullString + description sql.NullString + sourceURL string + releaseTime sql.NullInt64 + thumbnailURL sql.NullString + fullLocalPath string + claimID sql.NullString +} + +func (r *syncRecord) toSourceVideo() (SourceVideo, *string) { + video := SourceVideo { + ID: r.nativeID, + Source: r.source, + SourceURL: r.sourceURL, + FullLocalPath: r.fullLocalPath, + } + + if r.title.Valid { + video.Title = &r.title.String + } else { + video.Title = nil + } + + if r.description.Valid { + video.Description = &r.description.String + } else { + video.Description = nil + } + + if r.releaseTime.Valid { + video.ReleaseTime = &r.releaseTime.Int64 + } else { + video.ReleaseTime = nil + } + + if r.thumbnailURL.Valid { + video.ThumbnailURL = &r.thumbnailURL.String + } else { + video.ThumbnailURL = nil + } + + if r.claimID.Valid { + return video, &r.claimID.String + } else { + return video, nil + } +} diff --git a/local/ytdlVideoSource.go b/local/ytdlVideoSource.go index b68c18f..13080f9 100644 --- a/local/ytdlVideoSource.go +++ b/local/ytdlVideoSource.go @@ -49,6 +49,7 @@ func (s *YtdlVideoSource) GetVideo(id string) (*SourceVideo, error) { sourceVideo := SourceVideo { ID: id, + Source: "YouTube", Title: &metadata.Title, Description: &metadata.Description, SourceURL: "\nhttps://www.youtube.com/watch?v=" + id,