ytsync/local/syncDb.go
2022-01-25 09:36:46 -06:00

676 lines
13 KiB
Go

package local
import (
"database/sql"
_ "github.com/mattn/go-sqlite3"
log "github.com/sirupsen/logrus"
)
type SyncDb struct {
db *sql.DB
}
type SyncDbSummary struct {
Total int
CachedUnpublished int
UncachedUnpublished int
LatestKnown int64
LatestPublished int64
}
func NewSyncDb(path string) (*SyncDb, error) {
db, err := sql.Open("sqlite3", path)
if err != nil {
log.Errorf("Error opening cache DB at %s: %v", path, err)
return nil, err
}
cache := SyncDb {
db: db,
}
err = cache.ensureSchema()
if err != nil {
log.Errorf("Error while ensuring sync DB structure: %v", err)
return nil, err
}
return &cache, nil
}
func (c *SyncDb) Close() error {
return c.db.Close()
}
func (c *SyncDb) SaveKnownVideo(video SourceVideo) error {
if video.ID == "" {
log.Warnf("Trying to save a video with no ID: %v", video)
}
insertSql := `
INSERT INTO videos (
source,
native_id,
title,
description,
source_url,
release_time,
thumbnail_url,
full_local_path
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (source, native_id)
DO NOTHING;
`
r := SyncRecordFromSourceVideo(video)
_, err := c.db.Exec(
insertSql,
r.Source,
r.NativeID,
r.Title,
r.Description,
r.SourceURL,
r.ReleaseTime,
r.ThumbnailURL,
r.FullLocalPath,
)
return err
}
func (c *SyncDb) SavePublishableVideo(video PublishableVideo) error {
upsertSql := `
INSERT INTO videos (
source,
native_id,
claim_name,
title,
description,
source_url,
release_time,
thumbnail_url,
full_local_path
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (source, native_id)
DO UPDATE SET
claim_name = excluded.claim_name,
title = excluded.title,
description = excluded.description,
source_url = excluded.source_url,
release_time = excluded.release_time,
thumbnail_url = excluded.thumbnail_url,
full_local_path = excluded.full_local_path;
`
_, err := c.db.Exec(
upsertSql,
video.Source,
video.ID,
video.ClaimName,
video.Title,
video.Description,
video.SourceURL,
video.ReleaseTime,
video.ThumbnailURL,
video.FullLocalPath,
)
if err != nil {
return err
}
err = c.upsertTags(video.Source, video.ID, video.Tags)
if err != nil {
return err
}
err = c.upsertLanguages(video.Source, video.ID, video.Languages)
return err
}
func (c *SyncDb) SavePublishedVideo(video PublishedVideo) error {
upsertSql := `
INSERT INTO videos (
source,
native_id,
claim_id,
claim_name,
title,
description,
source_url,
release_time,
thumbnail_url,
full_local_path
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (source, native_id)
DO UPDATE SET
claim_id = excluded.claim_id,
claim_name = excluded.claim_name,
title = excluded.title,
description = excluded.description,
source_url = excluded.source_url,
release_time = excluded.release_time,
thumbnail_url = excluded.thumbnail_url,
full_local_path = excluded.full_local_path;
`
_, err := c.db.Exec(
upsertSql,
video.Source,
video.NativeID,
video.ClaimID,
video.ClaimName,
video.Title,
video.Description,
video.SourceURL,
video.ReleaseTime,
video.ThumbnailURL,
video.FullLocalPath,
)
if err != nil {
return err
}
err = c.upsertTags(video.Source, video.NativeID, video.Tags)
if err != nil {
return err
}
err = c.upsertLanguages(video.Source, video.NativeID, video.Languages)
return err
}
func (c *SyncDb) MarkVideoUncached(source, id string) error {
updateSql := `
UPDATE videos
SET full_local_path = NULL
WHERE source = ? AND native_id = ?;
`
_, err := c.db.Exec(
updateSql,
source,
id,
)
return err
}
func (c *SyncDb) _SaveVideoData(video SourceVideo) error {
upsertSql := `
INSERT INTO videos (
source,
native_id,
title,
description,
source_url,
release_time,
thumbnail_url,
full_local_path
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (source, native_id)
DO UPDATE SET
title = excluded.title,
description = excluded.description,
source_url = excluded.source_url,
release_time = excluded.release_time,
thumbnail_url = excluded.thumbnail_url,
full_local_path = excluded.full_local_path;
`
_, err := c.db.Exec(
upsertSql,
video.Source,
video.ID,
video.Title,
video.Description,
video.SourceURL,
video.ReleaseTime,
video.ThumbnailURL,
video.FullLocalPath,
)
if err != nil {
return err
}
err = c.upsertTags(video.Source, video.ID, video.Tags)
if err != nil {
return err
}
err = c.upsertLanguages(video.Source, video.ID, video.Languages)
return err
}
func (c *SyncDb) SaveVideoPublication(video PublishableVideo, claimID string) error {
upsertSql := `
INSERT INTO videos (
source,
native_id,
title,
description,
source_url,
release_time,
thumbnail_url,
full_local_path,
claim_id
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (source, native_id)
DO UPDATE SET
title = excluded.title,
description = excluded.description,
source_url = excluded.source_url,
release_time = excluded.release_time,
thumbnail_url = excluded.thumbnail_url,
full_local_path = excluded.full_local_path,
claim_id = excluded.claim_id;
`
_, err := c.db.Exec(
upsertSql,
video.Source,
video.ID,
video.Title,
video.Description,
video.SourceURL,
video.ReleaseTime,
video.ThumbnailURL,
video.FullLocalPath,
claimID,
)
if err != nil {
return err
}
err = c.upsertTags(video.Source, video.ID, video.Tags)
if err != nil {
return err
}
err = c.upsertLanguages(video.Source, video.ID, video.Languages)
return err
}
func (c *SyncDb) IsVideoPublished(source, id string) (bool, string, error) {
selectSql := `
SELECT
claim_id
FROM videos
WHERE source = ? AND native_id = ?
`
row := c.db.QueryRow(selectSql, source, id)
var claimID sql.NullString
err := row.Scan(&claimID)
if err == sql.ErrNoRows {
return false, "", nil
} else if err != nil {
log.Errorf("Error querying video publication for %s:%s from sync DB: %v", source, id, err)
return false, "", err
}
if claimID.Valid {
return true, claimID.String, nil
} else {
return false, "", nil
}
}
func (c *SyncDb) IsVideoCached(source, id string) (bool, string, error) {
selectSql := `
SELECT
full_local_path
FROM videos
WHERE source = ? AND native_id = ?
`
row := c.db.QueryRow(selectSql, source, id)
var localPath sql.NullString
err := row.Scan(&localPath)
if err == sql.ErrNoRows {
return false, "", nil
} else if err != nil {
log.Errorf("Error querying video cache status for %s:%s from sync DB: %v", source, id, err)
return false, "", err
}
if localPath.Valid {
return true, localPath.String, nil
} else {
return false, "", nil
}
}
func (c *SyncDb) GetVideoRecord(source, id string, includeTags, includeLanguages bool) (*SyncRecord, error) {
selectSql := `
SELECT
native_id,
title,
description,
source_url,
release_time,
thumbnail_url,
full_local_path,
claim_id
FROM videos
WHERE source = ? AND native_id = ?
`
row := c.db.QueryRow(selectSql, source, id)
var record SyncRecord
err := row.Scan(
&record.NativeID,
&record.Title,
&record.Description,
&record.SourceURL,
&record.ReleaseTime,
&record.ThumbnailURL,
&record.FullLocalPath,
&record.ClaimID,
)
if err == sql.ErrNoRows {
log.Debugf("Data for %s:%s is not in the sync DB", source, id)
return nil, nil
} else if err != nil {
log.Errorf("Error querying video data for %s:%s from sync DB: %v", source, id, err)
return nil, err
}
if includeTags {
tags, err := c.getTags(source, id)
if err != nil {
return nil, err
}
record.Tags = &tags
}
if includeLanguages {
languages, err := c.getLanguages(source, id)
if err != nil {
return nil, err
}
record.Languages = &languages
}
return &record, nil
}
func (c *SyncDb) GetUnpublishedIDs(source string) ([]string, error) {
selectSql := `
SELECT
native_id
FROM videos
WHERE source = ? AND claim_id IS NULL
`
ids := []string{}
rows, err := c.db.Query(selectSql, source)
if err != nil {
return ids, err
}
defer rows.Close()
for rows.Next() {
var id string
err = rows.Scan(&id)
if err != nil {
return ids, err
}
ids = append(ids, id)
}
return ids, nil
}
func (c *SyncDb) GetSummary() (*SyncDbSummary, error) {
selectSql := `
SELECT
COUNT() AS total,
COUNT(v_unpub.full_local_path) AS cached_unpublished,
COUNT(v_all.claim_id) - COUNT(v_unpub.full_local_path) AS uncached_unpublished,
MAX(v_all.release_time) AS latest_known,
MAX(v_pub.release_time) AS latest_published
FROM videos v_all
LEFT JOIN videos v_pub ON v_all.source = v_pub.source AND v_all.native_id = v_pub.native_id AND v_pub.claim_id IS NOT NULL
LEFT JOIN videos v_unpub ON v_all.source = v_unpub.source AND v_all.native_id = v_unpub.native_id AND v_unpub.claim_id IS NULL
`
row := c.db.QueryRow(selectSql)
var summary SyncDbSummary
var latestKnown, latestPublished sql.NullInt64
err := row.Scan(
&summary.Total,
&summary.CachedUnpublished,
&summary.UncachedUnpublished,
&latestKnown,
&latestPublished,
)
if err != nil {
log.Errorf("Error querying sync DB summary: %v", err)
return nil, err
}
if latestKnown.Valid {
summary.LatestKnown = latestKnown.Int64
}
if latestPublished.Valid {
summary.LatestPublished = latestPublished.Int64
}
return &summary, nil
}
func (c *SyncDb) ensureSchema() error {
createSql := `
CREATE TABLE IF NOT EXISTS videos (
source TEXT,
native_id TEXT,
title TEXT,
description TEXT,
source_url TEXT,
release_time INT,
thumbnail_url TEXT,
full_local_path TEXT,
claim_name TEXT,
claim_id TEXT,
PRIMARY KEY (source, native_id)
);
CREATE TABLE IF NOT EXISTS video_tags (
source TEXT NOT NULL,
native_id TEXT NOT NULL,
tag TEXT NOT NULL,
UNIQUE (source, native_id, tag)
);
CREATE TABLE IF NOT EXISTS video_languages (
source TEXT NOT NULL,
native_id TEXT NOT NULL,
language TEXT NOT NULL,
UNIQUE (source, native_id, language)
);
`
_, err := c.db.Exec(createSql)
return err
}
func (c *SyncDb) upsertTags(source, id string, tags []string) error {
upsertSql := `
INSERT INTO video_tags (
source,
native_id,
tag
) VALUES (?, ?, ?)
ON CONFLICT (source, native_id, tag)
DO NOTHING;
`
for _, tag := range tags {
_, err := c.db.Exec(
upsertSql,
source,
id,
tag,
)
if err != nil {
log.Errorf("Error inserting tag %s into sync DB for %s:%s: %v", tag, source, id, err)
return err
}
}
return nil
}
func (c *SyncDb) getTags(source, id string) ([]string, error) {
selectSql := `
SELECT tag
FROM video_tags
WHERE source = ? AND native_id = ?;
`
rows, err := c.db.Query(selectSql, source, id)
if err != nil {
log.Errorf("Error getting tags from sync DB for %s:%s: %v", source, id, err)
return nil, err
}
defer rows.Close()
var tags []string
for rows.Next() {
var tag string
err = rows.Scan(&tag)
if err != nil {
log.Error("Error deserializing tag from sync DB for %s:%s: %v", source, id, err)
return nil, err
}
tags = append(tags, tag)
}
return tags, nil
}
func (c *SyncDb) upsertLanguages(source, id string, languages []string) error {
upsertSql := `
INSERT INTO video_languages (
source,
native_id,
language
) VALUES (?, ?, ?)
ON CONFLICT (source, native_id, language)
DO NOTHING;
`
for _, language := range languages {
_, err := c.db.Exec(
upsertSql,
source,
id,
language,
)
if err != nil {
log.Errorf("Error inserting language %s into sync DB for %s:%s: %v", language, source, id, err)
return err
}
}
return nil
}
func (c *SyncDb) getLanguages(source, id string) ([]string, error) {
selectSql := `
SELECT language
FROM video_languages
WHERE source = ? AND native_id = ?;
`
rows, err := c.db.Query(selectSql, source, id)
if err != nil {
log.Errorf("Error getting languages from sync DB for %s:%s: %v", source, id, err)
return nil, err
}
defer rows.Close()
var languages []string
for rows.Next() {
var language string
err = rows.Scan(&language)
if err != nil {
log.Error("Error deserializing language from sync DB for %s:%s: %v", source, id, err)
return nil, err
}
languages = append(languages, language)
}
return languages, nil
}
type SyncRecord struct {
Source string
NativeID string
Title sql.NullString
Description sql.NullString
SourceURL sql.NullString
ReleaseTime sql.NullInt64
ThumbnailURL sql.NullString
FullLocalPath sql.NullString
ClaimID sql.NullString
Tags *[]string
Languages *[]string
}
func SyncRecordFromSourceVideo(v SourceVideo) SyncRecord {
r := SyncRecord {
Source: v.Source,
NativeID: v.ID,
SourceURL: sql.NullString { String: v.SourceURL, Valid: true },
}
if v.Title != nil {
r.Title = sql.NullString { String: *v.Title, Valid: true }
}
if v.Description != nil {
r.Description = sql.NullString { String: *v.Description, Valid: true }
}
if v.ThumbnailURL != nil {
r.ThumbnailURL = sql.NullString { String: *v.ThumbnailURL, Valid: true }
}
if v.FullLocalPath != nil {
r.FullLocalPath = sql.NullString { String: *v.FullLocalPath, Valid: true }
}
if v.ReleaseTime != nil {
r.ReleaseTime = sql.NullInt64 { Int64: *v.ReleaseTime, Valid: true }
}
if len(v.Tags) > 0 {
r.Tags = &v.Tags
}
if len(v.Languages) > 0 {
r.Languages = &v.Languages
}
return r
}
func (r *SyncRecord) ToPublishableVideo() *PublishableVideo {
if !(r.Title.Valid &&
r.Description.Valid &&
r.SourceURL.Valid &&
r.ReleaseTime.Valid &&
r.ThumbnailURL.Valid &&
r.FullLocalPath.Valid &&
r.Tags != nil &&
r.Languages != nil) {
return nil
}
video := PublishableVideo {
ID: r.NativeID,
Source: r.Source,
Description: r.Description.String,
SourceURL: r.SourceURL.String,
ReleaseTime: r.ReleaseTime.Int64,
ThumbnailURL: r.ThumbnailURL.String,
FullLocalPath: r.FullLocalPath.String,
Tags: *r.Tags,
Languages: *r.Languages,
}
return &video
}