676 lines
13 KiB
Go
676 lines
13 KiB
Go
package local
|
|
|
|
import (
|
|
"database/sql"
|
|
_ "github.com/mattn/go-sqlite3"
|
|
log "github.com/sirupsen/logrus"
|
|
)
|
|
|
|
type SyncDb struct {
|
|
db *sql.DB
|
|
}
|
|
|
|
type SyncDbSummary struct {
|
|
Total int
|
|
CachedUnpublished int
|
|
UncachedUnpublished int
|
|
LatestKnown int64
|
|
LatestPublished int64
|
|
}
|
|
|
|
func NewSyncDb(path string) (*SyncDb, error) {
|
|
db, err := sql.Open("sqlite3", path)
|
|
if err != nil {
|
|
log.Errorf("Error opening cache DB at %s: %v", path, err)
|
|
return nil, err
|
|
}
|
|
|
|
cache := SyncDb {
|
|
db: db,
|
|
}
|
|
err = cache.ensureSchema()
|
|
if err != nil {
|
|
log.Errorf("Error while ensuring sync DB structure: %v", err)
|
|
return nil, err
|
|
}
|
|
|
|
return &cache, nil
|
|
}
|
|
|
|
func (c *SyncDb) Close() error {
|
|
return c.db.Close()
|
|
}
|
|
|
|
func (c *SyncDb) SaveKnownVideo(video SourceVideo) error {
|
|
if video.ID == "" {
|
|
log.Warnf("Trying to save a video with no ID: %v", video)
|
|
}
|
|
insertSql := `
|
|
INSERT INTO videos (
|
|
source,
|
|
native_id,
|
|
title,
|
|
description,
|
|
source_url,
|
|
release_time,
|
|
thumbnail_url,
|
|
full_local_path
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT (source, native_id)
|
|
DO NOTHING;
|
|
`
|
|
|
|
r := SyncRecordFromSourceVideo(video)
|
|
|
|
_, err := c.db.Exec(
|
|
insertSql,
|
|
r.Source,
|
|
r.NativeID,
|
|
r.Title,
|
|
r.Description,
|
|
r.SourceURL,
|
|
r.ReleaseTime,
|
|
r.ThumbnailURL,
|
|
r.FullLocalPath,
|
|
)
|
|
return err
|
|
}
|
|
|
|
func (c *SyncDb) SavePublishableVideo(video PublishableVideo) error {
|
|
upsertSql := `
|
|
INSERT INTO videos (
|
|
source,
|
|
native_id,
|
|
claim_name,
|
|
title,
|
|
description,
|
|
source_url,
|
|
release_time,
|
|
thumbnail_url,
|
|
full_local_path
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT (source, native_id)
|
|
DO UPDATE SET
|
|
claim_name = excluded.claim_name,
|
|
title = excluded.title,
|
|
description = excluded.description,
|
|
source_url = excluded.source_url,
|
|
release_time = excluded.release_time,
|
|
thumbnail_url = excluded.thumbnail_url,
|
|
full_local_path = excluded.full_local_path;
|
|
`
|
|
_, err := c.db.Exec(
|
|
upsertSql,
|
|
video.Source,
|
|
video.ID,
|
|
video.ClaimName,
|
|
video.Title,
|
|
video.Description,
|
|
video.SourceURL,
|
|
video.ReleaseTime,
|
|
video.ThumbnailURL,
|
|
video.FullLocalPath,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
|
|
err = c.upsertTags(video.Source, video.ID, video.Tags)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = c.upsertLanguages(video.Source, video.ID, video.Languages)
|
|
return err
|
|
}
|
|
|
|
func (c *SyncDb) SavePublishedVideo(video PublishedVideo) error {
|
|
upsertSql := `
|
|
INSERT INTO videos (
|
|
source,
|
|
native_id,
|
|
claim_id,
|
|
claim_name,
|
|
title,
|
|
description,
|
|
source_url,
|
|
release_time,
|
|
thumbnail_url,
|
|
full_local_path
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT (source, native_id)
|
|
DO UPDATE SET
|
|
claim_id = excluded.claim_id,
|
|
claim_name = excluded.claim_name,
|
|
title = excluded.title,
|
|
description = excluded.description,
|
|
source_url = excluded.source_url,
|
|
release_time = excluded.release_time,
|
|
thumbnail_url = excluded.thumbnail_url,
|
|
full_local_path = excluded.full_local_path;
|
|
`
|
|
_, err := c.db.Exec(
|
|
upsertSql,
|
|
video.Source,
|
|
video.NativeID,
|
|
video.ClaimID,
|
|
video.ClaimName,
|
|
video.Title,
|
|
video.Description,
|
|
video.SourceURL,
|
|
video.ReleaseTime,
|
|
video.ThumbnailURL,
|
|
video.FullLocalPath,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
|
|
err = c.upsertTags(video.Source, video.NativeID, video.Tags)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = c.upsertLanguages(video.Source, video.NativeID, video.Languages)
|
|
return err
|
|
}
|
|
|
|
func (c *SyncDb) MarkVideoUncached(source, id string) error {
|
|
updateSql := `
|
|
UPDATE videos
|
|
SET full_local_path = NULL
|
|
WHERE source = ? AND native_id = ?;
|
|
`
|
|
_, err := c.db.Exec(
|
|
updateSql,
|
|
source,
|
|
id,
|
|
)
|
|
return err
|
|
}
|
|
|
|
func (c *SyncDb) _SaveVideoData(video SourceVideo) error {
|
|
upsertSql := `
|
|
INSERT INTO videos (
|
|
source,
|
|
native_id,
|
|
title,
|
|
description,
|
|
source_url,
|
|
release_time,
|
|
thumbnail_url,
|
|
full_local_path
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT (source, native_id)
|
|
DO UPDATE SET
|
|
title = excluded.title,
|
|
description = excluded.description,
|
|
source_url = excluded.source_url,
|
|
release_time = excluded.release_time,
|
|
thumbnail_url = excluded.thumbnail_url,
|
|
full_local_path = excluded.full_local_path;
|
|
`
|
|
_, err := c.db.Exec(
|
|
upsertSql,
|
|
video.Source,
|
|
video.ID,
|
|
video.Title,
|
|
video.Description,
|
|
video.SourceURL,
|
|
video.ReleaseTime,
|
|
video.ThumbnailURL,
|
|
video.FullLocalPath,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = c.upsertTags(video.Source, video.ID, video.Tags)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = c.upsertLanguages(video.Source, video.ID, video.Languages)
|
|
return err
|
|
}
|
|
|
|
func (c *SyncDb) SaveVideoPublication(video PublishableVideo, claimID string) error {
|
|
upsertSql := `
|
|
INSERT INTO videos (
|
|
source,
|
|
native_id,
|
|
title,
|
|
description,
|
|
source_url,
|
|
release_time,
|
|
thumbnail_url,
|
|
full_local_path,
|
|
claim_id
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT (source, native_id)
|
|
DO UPDATE SET
|
|
title = excluded.title,
|
|
description = excluded.description,
|
|
source_url = excluded.source_url,
|
|
release_time = excluded.release_time,
|
|
thumbnail_url = excluded.thumbnail_url,
|
|
full_local_path = excluded.full_local_path,
|
|
claim_id = excluded.claim_id;
|
|
`
|
|
_, err := c.db.Exec(
|
|
upsertSql,
|
|
video.Source,
|
|
video.ID,
|
|
video.Title,
|
|
video.Description,
|
|
video.SourceURL,
|
|
video.ReleaseTime,
|
|
video.ThumbnailURL,
|
|
video.FullLocalPath,
|
|
claimID,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = c.upsertTags(video.Source, video.ID, video.Tags)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = c.upsertLanguages(video.Source, video.ID, video.Languages)
|
|
return err
|
|
}
|
|
|
|
func (c *SyncDb) IsVideoPublished(source, id string) (bool, string, error) {
|
|
selectSql := `
|
|
SELECT
|
|
claim_id
|
|
FROM videos
|
|
WHERE source = ? AND native_id = ?
|
|
`
|
|
row := c.db.QueryRow(selectSql, source, id)
|
|
|
|
var claimID sql.NullString
|
|
err := row.Scan(&claimID)
|
|
|
|
if err == sql.ErrNoRows {
|
|
return false, "", nil
|
|
} else if err != nil {
|
|
log.Errorf("Error querying video publication for %s:%s from sync DB: %v", source, id, err)
|
|
return false, "", err
|
|
}
|
|
|
|
if claimID.Valid {
|
|
return true, claimID.String, nil
|
|
} else {
|
|
return false, "", nil
|
|
}
|
|
}
|
|
|
|
func (c *SyncDb) IsVideoCached(source, id string) (bool, string, error) {
|
|
selectSql := `
|
|
SELECT
|
|
full_local_path
|
|
FROM videos
|
|
WHERE source = ? AND native_id = ?
|
|
`
|
|
row := c.db.QueryRow(selectSql, source, id)
|
|
|
|
var localPath sql.NullString
|
|
err := row.Scan(&localPath)
|
|
|
|
if err == sql.ErrNoRows {
|
|
return false, "", nil
|
|
} else if err != nil {
|
|
log.Errorf("Error querying video cache status for %s:%s from sync DB: %v", source, id, err)
|
|
return false, "", err
|
|
}
|
|
|
|
if localPath.Valid {
|
|
return true, localPath.String, nil
|
|
} else {
|
|
return false, "", nil
|
|
}
|
|
}
|
|
|
|
func (c *SyncDb) GetVideoRecord(source, id string, includeTags, includeLanguages bool) (*SyncRecord, error) {
|
|
selectSql := `
|
|
SELECT
|
|
native_id,
|
|
title,
|
|
description,
|
|
source_url,
|
|
release_time,
|
|
thumbnail_url,
|
|
full_local_path,
|
|
claim_id
|
|
FROM videos
|
|
WHERE source = ? AND native_id = ?
|
|
`
|
|
row := c.db.QueryRow(selectSql, source, id)
|
|
|
|
var record SyncRecord
|
|
err := row.Scan(
|
|
&record.NativeID,
|
|
&record.Title,
|
|
&record.Description,
|
|
&record.SourceURL,
|
|
&record.ReleaseTime,
|
|
&record.ThumbnailURL,
|
|
&record.FullLocalPath,
|
|
&record.ClaimID,
|
|
)
|
|
if err == sql.ErrNoRows {
|
|
log.Debugf("Data for %s:%s is not in the sync DB", source, id)
|
|
return nil, nil
|
|
} else if err != nil {
|
|
log.Errorf("Error querying video data for %s:%s from sync DB: %v", source, id, err)
|
|
return nil, err
|
|
}
|
|
|
|
if includeTags {
|
|
tags, err := c.getTags(source, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
record.Tags = &tags
|
|
}
|
|
|
|
if includeLanguages {
|
|
languages, err := c.getLanguages(source, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
record.Languages = &languages
|
|
}
|
|
|
|
return &record, nil
|
|
}
|
|
|
|
func (c *SyncDb) GetUnpublishedIDs(source string) ([]string, error) {
|
|
selectSql := `
|
|
SELECT
|
|
native_id
|
|
FROM videos
|
|
WHERE source = ? AND claim_id IS NULL
|
|
`
|
|
ids := []string{}
|
|
|
|
rows, err := c.db.Query(selectSql, source)
|
|
if err != nil {
|
|
return ids, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
for rows.Next() {
|
|
var id string
|
|
err = rows.Scan(&id)
|
|
if err != nil {
|
|
return ids, err
|
|
}
|
|
ids = append(ids, id)
|
|
}
|
|
|
|
return ids, nil
|
|
}
|
|
|
|
func (c *SyncDb) GetSummary() (*SyncDbSummary, error) {
|
|
selectSql := `
|
|
SELECT
|
|
COUNT() AS total,
|
|
COUNT(v_unpub.full_local_path) AS cached_unpublished,
|
|
COUNT(v_all.claim_id) - COUNT(v_unpub.full_local_path) AS uncached_unpublished,
|
|
MAX(v_all.release_time) AS latest_known,
|
|
MAX(v_pub.release_time) AS latest_published
|
|
FROM videos v_all
|
|
LEFT JOIN videos v_pub ON v_all.source = v_pub.source AND v_all.native_id = v_pub.native_id AND v_pub.claim_id IS NOT NULL
|
|
LEFT JOIN videos v_unpub ON v_all.source = v_unpub.source AND v_all.native_id = v_unpub.native_id AND v_unpub.claim_id IS NULL
|
|
`
|
|
row := c.db.QueryRow(selectSql)
|
|
|
|
var summary SyncDbSummary
|
|
var latestKnown, latestPublished sql.NullInt64
|
|
err := row.Scan(
|
|
&summary.Total,
|
|
&summary.CachedUnpublished,
|
|
&summary.UncachedUnpublished,
|
|
&latestKnown,
|
|
&latestPublished,
|
|
)
|
|
if err != nil {
|
|
log.Errorf("Error querying sync DB summary: %v", err)
|
|
return nil, err
|
|
}
|
|
|
|
if latestKnown.Valid {
|
|
summary.LatestKnown = latestKnown.Int64
|
|
}
|
|
if latestPublished.Valid {
|
|
summary.LatestPublished = latestPublished.Int64
|
|
}
|
|
|
|
return &summary, nil
|
|
}
|
|
|
|
func (c *SyncDb) ensureSchema() error {
|
|
createSql := `
|
|
CREATE TABLE IF NOT EXISTS videos (
|
|
source TEXT,
|
|
native_id TEXT,
|
|
title TEXT,
|
|
description TEXT,
|
|
source_url TEXT,
|
|
release_time INT,
|
|
thumbnail_url TEXT,
|
|
full_local_path TEXT,
|
|
claim_name TEXT,
|
|
claim_id TEXT,
|
|
PRIMARY KEY (source, native_id)
|
|
);
|
|
CREATE TABLE IF NOT EXISTS video_tags (
|
|
source TEXT NOT NULL,
|
|
native_id TEXT NOT NULL,
|
|
tag TEXT NOT NULL,
|
|
UNIQUE (source, native_id, tag)
|
|
);
|
|
CREATE TABLE IF NOT EXISTS video_languages (
|
|
source TEXT NOT NULL,
|
|
native_id TEXT NOT NULL,
|
|
language TEXT NOT NULL,
|
|
UNIQUE (source, native_id, language)
|
|
);
|
|
`
|
|
_, err := c.db.Exec(createSql)
|
|
return err
|
|
}
|
|
|
|
func (c *SyncDb) upsertTags(source, id string, tags []string) error {
|
|
upsertSql := `
|
|
INSERT INTO video_tags (
|
|
source,
|
|
native_id,
|
|
tag
|
|
) VALUES (?, ?, ?)
|
|
ON CONFLICT (source, native_id, tag)
|
|
DO NOTHING;
|
|
`
|
|
|
|
for _, tag := range tags {
|
|
_, err := c.db.Exec(
|
|
upsertSql,
|
|
source,
|
|
id,
|
|
tag,
|
|
)
|
|
if err != nil {
|
|
log.Errorf("Error inserting tag %s into sync DB for %s:%s: %v", tag, source, id, err)
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *SyncDb) getTags(source, id string) ([]string, error) {
|
|
selectSql := `
|
|
SELECT tag
|
|
FROM video_tags
|
|
WHERE source = ? AND native_id = ?;
|
|
`
|
|
|
|
rows, err := c.db.Query(selectSql, source, id)
|
|
if err != nil {
|
|
log.Errorf("Error getting tags from sync DB for %s:%s: %v", source, id, err)
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var tags []string
|
|
for rows.Next() {
|
|
var tag string
|
|
err = rows.Scan(&tag)
|
|
if err != nil {
|
|
log.Error("Error deserializing tag from sync DB for %s:%s: %v", source, id, err)
|
|
return nil, err
|
|
}
|
|
tags = append(tags, tag)
|
|
}
|
|
|
|
return tags, nil
|
|
}
|
|
|
|
func (c *SyncDb) upsertLanguages(source, id string, languages []string) error {
|
|
upsertSql := `
|
|
INSERT INTO video_languages (
|
|
source,
|
|
native_id,
|
|
language
|
|
) VALUES (?, ?, ?)
|
|
ON CONFLICT (source, native_id, language)
|
|
DO NOTHING;
|
|
`
|
|
|
|
for _, language := range languages {
|
|
_, err := c.db.Exec(
|
|
upsertSql,
|
|
source,
|
|
id,
|
|
language,
|
|
)
|
|
if err != nil {
|
|
log.Errorf("Error inserting language %s into sync DB for %s:%s: %v", language, source, id, err)
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *SyncDb) getLanguages(source, id string) ([]string, error) {
|
|
selectSql := `
|
|
SELECT language
|
|
FROM video_languages
|
|
WHERE source = ? AND native_id = ?;
|
|
`
|
|
|
|
rows, err := c.db.Query(selectSql, source, id)
|
|
if err != nil {
|
|
log.Errorf("Error getting languages from sync DB for %s:%s: %v", source, id, err)
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var languages []string
|
|
for rows.Next() {
|
|
var language string
|
|
err = rows.Scan(&language)
|
|
if err != nil {
|
|
log.Error("Error deserializing language from sync DB for %s:%s: %v", source, id, err)
|
|
return nil, err
|
|
}
|
|
languages = append(languages, language)
|
|
}
|
|
|
|
return languages, nil
|
|
}
|
|
|
|
type SyncRecord struct {
|
|
Source string
|
|
NativeID string
|
|
Title sql.NullString
|
|
Description sql.NullString
|
|
SourceURL sql.NullString
|
|
ReleaseTime sql.NullInt64
|
|
ThumbnailURL sql.NullString
|
|
FullLocalPath sql.NullString
|
|
ClaimID sql.NullString
|
|
Tags *[]string
|
|
Languages *[]string
|
|
}
|
|
|
|
func SyncRecordFromSourceVideo(v SourceVideo) SyncRecord {
|
|
r := SyncRecord {
|
|
Source: v.Source,
|
|
NativeID: v.ID,
|
|
SourceURL: sql.NullString { String: v.SourceURL, Valid: true },
|
|
}
|
|
|
|
if v.Title != nil {
|
|
r.Title = sql.NullString { String: *v.Title, Valid: true }
|
|
}
|
|
|
|
if v.Description != nil {
|
|
r.Description = sql.NullString { String: *v.Description, Valid: true }
|
|
}
|
|
|
|
if v.ThumbnailURL != nil {
|
|
r.ThumbnailURL = sql.NullString { String: *v.ThumbnailURL, Valid: true }
|
|
}
|
|
|
|
if v.FullLocalPath != nil {
|
|
r.FullLocalPath = sql.NullString { String: *v.FullLocalPath, Valid: true }
|
|
}
|
|
|
|
if v.ReleaseTime != nil {
|
|
r.ReleaseTime = sql.NullInt64 { Int64: *v.ReleaseTime, Valid: true }
|
|
}
|
|
|
|
if len(v.Tags) > 0 {
|
|
r.Tags = &v.Tags
|
|
}
|
|
|
|
if len(v.Languages) > 0 {
|
|
r.Languages = &v.Languages
|
|
}
|
|
|
|
return r
|
|
}
|
|
|
|
func (r *SyncRecord) ToPublishableVideo() *PublishableVideo {
|
|
if !(r.Title.Valid &&
|
|
r.Description.Valid &&
|
|
r.SourceURL.Valid &&
|
|
r.ReleaseTime.Valid &&
|
|
r.ThumbnailURL.Valid &&
|
|
r.FullLocalPath.Valid &&
|
|
r.Tags != nil &&
|
|
r.Languages != nil) {
|
|
|
|
return nil
|
|
}
|
|
|
|
video := PublishableVideo {
|
|
ID: r.NativeID,
|
|
Source: r.Source,
|
|
Description: r.Description.String,
|
|
SourceURL: r.SourceURL.String,
|
|
ReleaseTime: r.ReleaseTime.Int64,
|
|
ThumbnailURL: r.ThumbnailURL.String,
|
|
FullLocalPath: r.FullLocalPath.String,
|
|
Tags: *r.Tags,
|
|
Languages: *r.Languages,
|
|
}
|
|
|
|
return &video
|
|
}
|