Add local DB to track syncs in preparation for whole channel syncing.
This commit is contained in:
parent
ce901f6b01
commit
e8adf6f6ce
6 changed files with 455 additions and 6 deletions
1
go.mod
1
go.mod
|
@ -22,6 +22,7 @@ require (
|
||||||
github.com/kr/pretty v0.2.1 // indirect
|
github.com/kr/pretty v0.2.1 // indirect
|
||||||
github.com/lbryio/lbry.go/v2 v2.7.2-0.20210824154606-3e18b74da08b
|
github.com/lbryio/lbry.go/v2 v2.7.2-0.20210824154606-3e18b74da08b
|
||||||
github.com/lbryio/reflector.go v1.1.3-0.20210412225256-4392c9724262
|
github.com/lbryio/reflector.go v1.1.3-0.20210412225256-4392c9724262
|
||||||
|
github.com/mattn/go-sqlite3 v1.10.0
|
||||||
github.com/miekg/dns v1.1.22 // indirect
|
github.com/miekg/dns v1.1.22 // indirect
|
||||||
github.com/mitchellh/go-ps v0.0.0-20190716172923-621e5597135b
|
github.com/mitchellh/go-ps v0.0.0-20190716172923-621e5597135b
|
||||||
github.com/opencontainers/go-digest v1.0.0-rc1 // indirect
|
github.com/opencontainers/go-digest v1.0.0-rc1 // indirect
|
||||||
|
|
1
go.sum
1
go.sum
|
@ -318,6 +318,7 @@ github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaO
|
||||||
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
|
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
|
||||||
github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU=
|
github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU=
|
||||||
github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
|
github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
|
||||||
|
github.com/mattn/go-sqlite3 v1.10.0 h1:jbhqpg7tQe4SupckyijYiy0mJJ/pRyHvXf7JdWK860o=
|
||||||
github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
|
github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
|
||||||
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
|
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
|
||||||
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
|
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
|
||||||
|
|
|
@ -21,6 +21,7 @@ type SyncContext struct {
|
||||||
KeepCache bool
|
KeepCache bool
|
||||||
ReflectStreams bool
|
ReflectStreams bool
|
||||||
TempDir string
|
TempDir string
|
||||||
|
SyncDbPath string
|
||||||
LbrynetAddr string
|
LbrynetAddr string
|
||||||
ChannelID string
|
ChannelID string
|
||||||
PublishBid float64
|
PublishBid float64
|
||||||
|
@ -31,6 +32,9 @@ func (c *SyncContext) Validate() error {
|
||||||
if c.TempDir == "" {
|
if c.TempDir == "" {
|
||||||
return errors.New("No TempDir provided")
|
return errors.New("No TempDir provided")
|
||||||
}
|
}
|
||||||
|
if c.SyncDbPath == "" {
|
||||||
|
return errors.New("No sync DB path provided")
|
||||||
|
}
|
||||||
if c.LbrynetAddr == "" {
|
if c.LbrynetAddr == "" {
|
||||||
return errors.New("No Lbrynet address provided")
|
return errors.New("No Lbrynet address provided")
|
||||||
}
|
}
|
||||||
|
@ -60,6 +64,7 @@ func AddCommand(rootCmd *cobra.Command) {
|
||||||
cmd.Flags().BoolVar(&syncContext.KeepCache, "keep-cache", false, "Don't delete local files after publishing.")
|
cmd.Flags().BoolVar(&syncContext.KeepCache, "keep-cache", false, "Don't delete local files after publishing.")
|
||||||
cmd.Flags().BoolVar(&syncContext.ReflectStreams, "reflect-streams", true, "Require published streams to be reflected.")
|
cmd.Flags().BoolVar(&syncContext.ReflectStreams, "reflect-streams", true, "Require published streams to be reflected.")
|
||||||
cmd.Flags().StringVar(&syncContext.TempDir, "temp-dir", getEnvDefault("TEMP_DIR", ""), "directory to use for temporary files")
|
cmd.Flags().StringVar(&syncContext.TempDir, "temp-dir", getEnvDefault("TEMP_DIR", ""), "directory to use for temporary files")
|
||||||
|
cmd.Flags().StringVar(&syncContext.SyncDbPath, "sync-db-path", getEnvDefault("SYNC_DB_PATH", ""), "Path to the local sync DB")
|
||||||
cmd.Flags().Float64Var(&syncContext.PublishBid, "publish-bid", 0.01, "Bid amount for the stream claim")
|
cmd.Flags().Float64Var(&syncContext.PublishBid, "publish-bid", 0.01, "Bid amount for the stream claim")
|
||||||
cmd.Flags().StringVar(&syncContext.LbrynetAddr, "lbrynet-address", getEnvDefault("LBRYNET_ADDRESS", ""), "JSONRPC address of the local LBRYNet daemon")
|
cmd.Flags().StringVar(&syncContext.LbrynetAddr, "lbrynet-address", getEnvDefault("LBRYNET_ADDRESS", ""), "JSONRPC address of the local LBRYNet daemon")
|
||||||
cmd.Flags().StringVar(&syncContext.ChannelID, "channel-id", "", "LBRY channel ID to publish to")
|
cmd.Flags().StringVar(&syncContext.ChannelID, "channel-id", "", "LBRY channel ID to publish to")
|
||||||
|
@ -87,6 +92,24 @@ func localCmd(cmd *cobra.Command, args []string) {
|
||||||
|
|
||||||
log.Debugf("Running sync for video ID %s", videoID)
|
log.Debugf("Running sync for video ID %s", videoID)
|
||||||
|
|
||||||
|
syncDB, err := NewSyncDb(syncContext.SyncDbPath)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error creating sync DB: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer syncDB.Close()
|
||||||
|
|
||||||
|
isSynced, claimID, err := syncDB.IsVideoPublished("YouTube", videoID)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error checking if video is already synced: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if isSynced {
|
||||||
|
log.Infof("Video %s is already published as %s.", videoID, claimID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
var publisher VideoPublisher
|
var publisher VideoPublisher
|
||||||
publisher, err = NewLocalSDKPublisher(syncContext.LbrynetAddr, syncContext.ChannelID, syncContext.PublishBid)
|
publisher, err = NewLocalSDKPublisher(syncContext.LbrynetAddr, syncContext.ChannelID, syncContext.PublishBid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -109,6 +132,12 @@ func localCmd(cmd *cobra.Command, args []string) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = syncDB.SaveVideoData(*sourceVideo)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error saving video data: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
processedVideo, err := processVideoForPublishing(*sourceVideo, syncContext.ChannelID)
|
processedVideo, err := processVideoForPublishing(*sourceVideo, syncContext.ChannelID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Error processing source video for publishing: %v", err)
|
log.Errorf("Error processing source video for publishing: %v", err)
|
||||||
|
@ -121,11 +150,18 @@ func localCmd(cmd *cobra.Command, args []string) {
|
||||||
log.Debugf("Object to be published: %v", processedVideo)
|
log.Debugf("Object to be published: %v", processedVideo)
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
doneReflectingCh, err := publisher.Publish(*processedVideo, syncContext.ReflectStreams)
|
claimID, doneReflectingCh, err := publisher.Publish(*processedVideo, syncContext.ReflectStreams)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Error publishing video: %v", err)
|
log.Errorf("Error publishing video: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
err = syncDB.SaveVideoPublication(*processedVideo, claimID)
|
||||||
|
if err != nil {
|
||||||
|
// Sync DB is corrupted after getting here
|
||||||
|
// and will allow double publication.
|
||||||
|
log.Errorf("Error saving video publication to sync DB: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if syncContext.ReflectStreams {
|
if syncContext.ReflectStreams {
|
||||||
err = <-doneReflectingCh
|
err = <-doneReflectingCh
|
||||||
|
@ -149,6 +185,7 @@ func localCmd(cmd *cobra.Command, args []string) {
|
||||||
|
|
||||||
type SourceVideo struct {
|
type SourceVideo struct {
|
||||||
ID string
|
ID string
|
||||||
|
Source string
|
||||||
Title *string
|
Title *string
|
||||||
Description *string
|
Description *string
|
||||||
SourceURL string
|
SourceURL string
|
||||||
|
@ -161,6 +198,7 @@ type SourceVideo struct {
|
||||||
|
|
||||||
type PublishableVideo struct {
|
type PublishableVideo struct {
|
||||||
ID string
|
ID string
|
||||||
|
Source string
|
||||||
ClaimName string
|
ClaimName string
|
||||||
Title string
|
Title string
|
||||||
Description string
|
Description string
|
||||||
|
@ -213,6 +251,8 @@ func processVideoForPublishing(source SourceVideo, channelID string) (*Publishab
|
||||||
}
|
}
|
||||||
|
|
||||||
processed := PublishableVideo {
|
processed := PublishableVideo {
|
||||||
|
ID: source.ID,
|
||||||
|
Source: source.Source,
|
||||||
ClaimName: claimName,
|
ClaimName: claimName,
|
||||||
Title: title,
|
Title: title,
|
||||||
Description: getAbbrevDescription(source),
|
Description: getAbbrevDescription(source),
|
||||||
|
@ -249,5 +289,5 @@ type VideoSource interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
type VideoPublisher interface {
|
type VideoPublisher interface {
|
||||||
Publish(video PublishableVideo, reflectStream bool) (chan error, error)
|
Publish(video PublishableVideo, reflectStream bool) (string, chan error, error)
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,7 +48,7 @@ func NewLocalSDKPublisher(sdkAddr, channelID string, publishBid float64) (*Local
|
||||||
return &publisher, nil
|
return &publisher, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *LocalSDKPublisher) Publish(video PublishableVideo, reflectStream bool) (chan error, error) {
|
func (p *LocalSDKPublisher) Publish(video PublishableVideo, reflectStream bool) (string, chan error, error) {
|
||||||
streamCreateOptions := jsonrpc.StreamCreateOptions {
|
streamCreateOptions := jsonrpc.StreamCreateOptions {
|
||||||
ClaimCreateOptions: jsonrpc.ClaimCreateOptions {
|
ClaimCreateOptions: jsonrpc.ClaimCreateOptions {
|
||||||
Title: &video.Title,
|
Title: &video.Title,
|
||||||
|
@ -64,11 +64,22 @@ func (p *LocalSDKPublisher) Publish(video PublishableVideo, reflectStream bool)
|
||||||
|
|
||||||
txSummary, err := p.lbrynet.StreamCreate(video.ClaimName, video.FullLocalPath, p.publishBid, streamCreateOptions)
|
txSummary, err := p.lbrynet.StreamCreate(video.ClaimName, video.FullLocalPath, p.publishBid, streamCreateOptions)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return "", nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var claimID *string
|
||||||
|
for _, output := range txSummary.Outputs {
|
||||||
|
if output.Type == "claim" {
|
||||||
|
claimID = &output.ClaimID
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if claimID == nil {
|
||||||
|
return "", nil, errors.New("Publish transaction did not have a claim output.")
|
||||||
}
|
}
|
||||||
|
|
||||||
if !reflectStream {
|
if !reflectStream {
|
||||||
return nil, nil
|
return "", nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
done := make(chan error, 1)
|
done := make(chan error, 1)
|
||||||
|
@ -102,7 +113,7 @@ func (p *LocalSDKPublisher) Publish(video PublishableVideo, reflectStream bool)
|
||||||
done <- nil
|
done <- nil
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return done, nil
|
return *claimID, done, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// if jsonrpc.Client.FileList is extended to match the actual jsonrpc schema, this can be removed
|
// if jsonrpc.Client.FileList is extended to match the actual jsonrpc schema, this can be removed
|
||||||
|
|
395
local/syncDb.go
Normal file
395
local/syncDb.go
Normal file
|
@ -0,0 +1,395 @@
|
||||||
|
package local
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
_ "github.com/mattn/go-sqlite3"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SyncDb struct {
|
||||||
|
db *sql.DB
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSyncDb(path string) (*SyncDb, error) {
|
||||||
|
db, err := sql.Open("sqlite3", path)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error opening cache DB at %s: %v", path, err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
cache := SyncDb {
|
||||||
|
db: db,
|
||||||
|
}
|
||||||
|
err = cache.ensureSchema()
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error while ensuring sync DB structure: %v", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &cache, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SyncDb) Close() error {
|
||||||
|
return c.db.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SyncDb) SaveVideoData(video SourceVideo) error {
|
||||||
|
upsertSql := `
|
||||||
|
INSERT INTO videos (
|
||||||
|
source,
|
||||||
|
native_id,
|
||||||
|
title,
|
||||||
|
description,
|
||||||
|
source_url,
|
||||||
|
release_time,
|
||||||
|
thumbnail_url,
|
||||||
|
full_local_path
|
||||||
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||||||
|
ON CONFLICT (source, native_id)
|
||||||
|
DO UPDATE SET
|
||||||
|
title = excluded.title,
|
||||||
|
description = excluded.description,
|
||||||
|
source_url = excluded.source_url,
|
||||||
|
release_time = excluded.release_time,
|
||||||
|
thumbnail_url = excluded.thumbnail_url,
|
||||||
|
full_local_path = excluded.full_local_path;
|
||||||
|
`
|
||||||
|
_, err := c.db.Exec(
|
||||||
|
upsertSql,
|
||||||
|
video.Source,
|
||||||
|
video.ID,
|
||||||
|
video.Title,
|
||||||
|
video.Description,
|
||||||
|
video.SourceURL,
|
||||||
|
video.ReleaseTime,
|
||||||
|
video.ThumbnailURL,
|
||||||
|
video.FullLocalPath,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = c.upsertTags(video.Source, video.ID, video.Tags)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = c.upsertLanguages(video.Source, video.ID, video.Languages)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SyncDb) SaveVideoPublication(video PublishableVideo, claimID string) error {
|
||||||
|
upsertSql := `
|
||||||
|
INSERT INTO videos (
|
||||||
|
source,
|
||||||
|
native_id,
|
||||||
|
title,
|
||||||
|
description,
|
||||||
|
source_url,
|
||||||
|
release_time,
|
||||||
|
thumbnail_url,
|
||||||
|
full_local_path,
|
||||||
|
claim_id
|
||||||
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||||
|
ON CONFLICT (source, native_id)
|
||||||
|
DO UPDATE SET
|
||||||
|
title = excluded.title,
|
||||||
|
description = excluded.description,
|
||||||
|
source_url = excluded.source_url,
|
||||||
|
release_time = excluded.release_time,
|
||||||
|
thumbnail_url = excluded.thumbnail_url,
|
||||||
|
full_local_path = excluded.full_local_path,
|
||||||
|
claim_id = excluded.claim_id;
|
||||||
|
`
|
||||||
|
_, err := c.db.Exec(
|
||||||
|
upsertSql,
|
||||||
|
video.Source,
|
||||||
|
video.ID,
|
||||||
|
video.Title,
|
||||||
|
video.Description,
|
||||||
|
video.SourceURL,
|
||||||
|
video.ReleaseTime,
|
||||||
|
video.ThumbnailURL,
|
||||||
|
video.FullLocalPath,
|
||||||
|
claimID,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = c.upsertTags(video.Source, video.ID, video.Tags)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = c.upsertLanguages(video.Source, video.ID, video.Languages)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SyncDb) IsVideoPublished(source, id string) (bool, string, error) {
|
||||||
|
selectSql := `
|
||||||
|
SELECT
|
||||||
|
claim_id
|
||||||
|
FROM videos
|
||||||
|
WHERE source = ? AND native_id = ?
|
||||||
|
`
|
||||||
|
row := c.db.QueryRow(selectSql, source, id)
|
||||||
|
|
||||||
|
var claimID sql.NullString
|
||||||
|
err := row.Scan(&claimID)
|
||||||
|
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
return false, "", nil
|
||||||
|
} else if err != nil {
|
||||||
|
log.Errorf("Error querying video publication for %s:%s from sync DB: %v", source, id, err)
|
||||||
|
return false, "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
if claimID.Valid {
|
||||||
|
return true, claimID.String, nil
|
||||||
|
} else {
|
||||||
|
return false, "", nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SyncDb) GetSavedVideoData(source, id string) (*SourceVideo, *string, error) {
|
||||||
|
selectSql := `
|
||||||
|
SELECT
|
||||||
|
native_id,
|
||||||
|
title,
|
||||||
|
description,
|
||||||
|
source_url,
|
||||||
|
release_time,
|
||||||
|
thumbnail_url,
|
||||||
|
full_local_path,
|
||||||
|
claim_id
|
||||||
|
FROM videos
|
||||||
|
WHERE source = ? AND native_id = ?
|
||||||
|
`
|
||||||
|
row := c.db.QueryRow(selectSql, source, id)
|
||||||
|
|
||||||
|
var record syncRecord
|
||||||
|
err := row.Scan(
|
||||||
|
&record.nativeID,
|
||||||
|
&record.title,
|
||||||
|
&record.description,
|
||||||
|
&record.sourceURL,
|
||||||
|
&record.releaseTime,
|
||||||
|
&record.thumbnailURL,
|
||||||
|
&record.fullLocalPath,
|
||||||
|
&record.claimID,
|
||||||
|
)
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
log.Debugf("Data for YouTube:%s is not in the sync DB", id)
|
||||||
|
return nil, nil, nil
|
||||||
|
} else if err != nil {
|
||||||
|
log.Errorf("Error querying video data for %s:%s from sync DB: %v", source, id, err)
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
sourceVideo, claimID := record.toSourceVideo()
|
||||||
|
|
||||||
|
tags, err := c.getTags(source, id)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
languages, err := c.getLanguages(source, id)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
sourceVideo.Tags = tags
|
||||||
|
sourceVideo.Languages = languages
|
||||||
|
|
||||||
|
return &sourceVideo, claimID, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SyncDb) ensureSchema() error {
|
||||||
|
createSql := `
|
||||||
|
CREATE TABLE IF NOT EXISTS videos (
|
||||||
|
source TEXT,
|
||||||
|
native_id TEXT,
|
||||||
|
title TEXT,
|
||||||
|
description TEXT,
|
||||||
|
source_url TEXT,
|
||||||
|
release_time INT,
|
||||||
|
thumbnail_url TEXT,
|
||||||
|
full_local_path TEXT,
|
||||||
|
claim_id TEXT,
|
||||||
|
PRIMARY KEY (source, native_id)
|
||||||
|
);
|
||||||
|
CREATE TABLE IF NOT EXISTS video_tags (
|
||||||
|
source TEXT NOT NULL,
|
||||||
|
native_id TEXT NOT NULL,
|
||||||
|
tag TEXT NOT NULL,
|
||||||
|
UNIQUE (source, native_id, tag)
|
||||||
|
);
|
||||||
|
CREATE TABLE IF NOT EXISTS video_languages (
|
||||||
|
source TEXT NOT NULL,
|
||||||
|
native_id TEXT NOT NULL,
|
||||||
|
language TEXT NOT NULL,
|
||||||
|
UNIQUE (source, native_id, language)
|
||||||
|
);
|
||||||
|
`
|
||||||
|
_, err := c.db.Exec(createSql)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SyncDb) upsertTags(source, id string, tags []string) error {
|
||||||
|
upsertSql := `
|
||||||
|
INSERT INTO video_tags (
|
||||||
|
source,
|
||||||
|
native_id,
|
||||||
|
tag
|
||||||
|
) VALUES (?, ?, ?)
|
||||||
|
ON CONFLICT (source, native_id, tag)
|
||||||
|
DO NOTHING;
|
||||||
|
`
|
||||||
|
|
||||||
|
for _, tag := range tags {
|
||||||
|
_, err := c.db.Exec(
|
||||||
|
upsertSql,
|
||||||
|
source,
|
||||||
|
id,
|
||||||
|
tag,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error inserting tag %s into sync DB for %s:%s: %v", tag, source, id, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SyncDb) getTags(source, id string) ([]string, error) {
|
||||||
|
selectSql := `
|
||||||
|
SELECT tag
|
||||||
|
FROM video_tags
|
||||||
|
WHERE source = ? AND native_id = ?;
|
||||||
|
`
|
||||||
|
|
||||||
|
rows, err := c.db.Query(selectSql, source, id)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error getting tags from sync DB for %s:%s: %v", source, id, err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var tags []string
|
||||||
|
for rows.Next() {
|
||||||
|
var tag string
|
||||||
|
err = rows.Scan(&tag)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Error deserializing tag from sync DB for %s:%s: %v", source, id, err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
tags = append(tags, tag)
|
||||||
|
}
|
||||||
|
|
||||||
|
return tags, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SyncDb) upsertLanguages(source, id string, languages []string) error {
|
||||||
|
upsertSql := `
|
||||||
|
INSERT INTO video_languages (
|
||||||
|
source,
|
||||||
|
native_id,
|
||||||
|
language
|
||||||
|
) VALUES (?, ?, ?)
|
||||||
|
ON CONFLICT (source, native_id, language)
|
||||||
|
DO NOTHING;
|
||||||
|
`
|
||||||
|
|
||||||
|
for _, language := range languages {
|
||||||
|
_, err := c.db.Exec(
|
||||||
|
upsertSql,
|
||||||
|
source,
|
||||||
|
id,
|
||||||
|
language,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error inserting language %s into sync DB for %s:%s: %v", language, source, id, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SyncDb) getLanguages(source, id string) ([]string, error) {
|
||||||
|
selectSql := `
|
||||||
|
SELECT language
|
||||||
|
FROM video_languages
|
||||||
|
WHERE source = ? AND native_id = ?;
|
||||||
|
`
|
||||||
|
|
||||||
|
rows, err := c.db.Query(selectSql, source, id)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error getting languages from sync DB for %s:%s: %v", source, id, err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var languages []string
|
||||||
|
for rows.Next() {
|
||||||
|
var language string
|
||||||
|
err = rows.Scan(&language)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Error deserializing language from sync DB for %s:%s: %v", source, id, err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
languages = append(languages, language)
|
||||||
|
}
|
||||||
|
|
||||||
|
return languages, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type syncRecord struct {
|
||||||
|
source string
|
||||||
|
nativeID string
|
||||||
|
title sql.NullString
|
||||||
|
description sql.NullString
|
||||||
|
sourceURL string
|
||||||
|
releaseTime sql.NullInt64
|
||||||
|
thumbnailURL sql.NullString
|
||||||
|
fullLocalPath string
|
||||||
|
claimID sql.NullString
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *syncRecord) toSourceVideo() (SourceVideo, *string) {
|
||||||
|
video := SourceVideo {
|
||||||
|
ID: r.nativeID,
|
||||||
|
Source: r.source,
|
||||||
|
SourceURL: r.sourceURL,
|
||||||
|
FullLocalPath: r.fullLocalPath,
|
||||||
|
}
|
||||||
|
|
||||||
|
if r.title.Valid {
|
||||||
|
video.Title = &r.title.String
|
||||||
|
} else {
|
||||||
|
video.Title = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if r.description.Valid {
|
||||||
|
video.Description = &r.description.String
|
||||||
|
} else {
|
||||||
|
video.Description = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if r.releaseTime.Valid {
|
||||||
|
video.ReleaseTime = &r.releaseTime.Int64
|
||||||
|
} else {
|
||||||
|
video.ReleaseTime = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if r.thumbnailURL.Valid {
|
||||||
|
video.ThumbnailURL = &r.thumbnailURL.String
|
||||||
|
} else {
|
||||||
|
video.ThumbnailURL = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if r.claimID.Valid {
|
||||||
|
return video, &r.claimID.String
|
||||||
|
} else {
|
||||||
|
return video, nil
|
||||||
|
}
|
||||||
|
}
|
|
@ -49,6 +49,7 @@ func (s *YtdlVideoSource) GetVideo(id string) (*SourceVideo, error) {
|
||||||
|
|
||||||
sourceVideo := SourceVideo {
|
sourceVideo := SourceVideo {
|
||||||
ID: id,
|
ID: id,
|
||||||
|
Source: "YouTube",
|
||||||
Title: &metadata.Title,
|
Title: &metadata.Title,
|
||||||
Description: &metadata.Description,
|
Description: &metadata.Description,
|
||||||
SourceURL: "\nhttps://www.youtube.com/watch?v=" + id,
|
SourceURL: "\nhttps://www.youtube.com/watch?v=" + id,
|
||||||
|
|
Loading…
Reference in a new issue