completely remove redis db dependency
improve download process
This commit is contained in:
parent
041c0cf056
commit
c8d6393c10
3 changed files with 57 additions and 102 deletions
|
@ -1,63 +0,0 @@
|
||||||
package redisdb
|
|
||||||
|
|
||||||
import (
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/errors"
|
|
||||||
|
|
||||||
"github.com/garyburd/redigo/redis"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
redisHashKey = "ytsync"
|
|
||||||
redisSyncedVal = "t"
|
|
||||||
)
|
|
||||||
|
|
||||||
type DB struct {
|
|
||||||
pool *redis.Pool
|
|
||||||
}
|
|
||||||
|
|
||||||
func New() *DB {
|
|
||||||
var r DB
|
|
||||||
r.pool = &redis.Pool{
|
|
||||||
MaxIdle: 3,
|
|
||||||
IdleTimeout: 5 * time.Minute,
|
|
||||||
Dial: func() (redis.Conn, error) { return redis.Dial("tcp", ":6379") },
|
|
||||||
TestOnBorrow: func(c redis.Conn, t time.Time) error {
|
|
||||||
if time.Since(t) < time.Minute {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
_, err := c.Do("PING")
|
|
||||||
return err
|
|
||||||
},
|
|
||||||
}
|
|
||||||
return &r
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r DB) IsPublished(id string) (bool, error) {
|
|
||||||
conn := r.pool.Get()
|
|
||||||
defer conn.Close()
|
|
||||||
|
|
||||||
alreadyPublished, err := redis.String(conn.Do("HGET", redisHashKey, id))
|
|
||||||
if err != nil && err != redis.ErrNil {
|
|
||||||
return false, errors.Prefix("redis error", err)
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
if alreadyPublished == redisSyncedVal {
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r DB) SetPublished(id string) error {
|
|
||||||
conn := r.pool.Get()
|
|
||||||
defer conn.Close()
|
|
||||||
|
|
||||||
_, err := redis.Bool(conn.Do("HSET", redisHashKey, id, redisSyncedVal))
|
|
||||||
if err != nil {
|
|
||||||
return errors.Prefix("redis error", err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
|
@ -28,6 +28,7 @@ type YoutubeVideo struct {
|
||||||
description string
|
description string
|
||||||
playlistPosition int64
|
playlistPosition int64
|
||||||
size *int64
|
size *int64
|
||||||
|
maxVideoSize int64
|
||||||
publishedAt time.Time
|
publishedAt time.Time
|
||||||
dir string
|
dir string
|
||||||
claimNames map[string]bool
|
claimNames map[string]bool
|
||||||
|
@ -121,14 +122,58 @@ func (v *YoutubeVideo) download() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
var downloadedFile *os.File
|
codec := []string{"H.264"}
|
||||||
downloadedFile, err = os.Create(videoPath)
|
ext := []string{"mp4"}
|
||||||
defer downloadedFile.Close()
|
|
||||||
if err != nil {
|
//Filter requires a [] interface{}
|
||||||
return err
|
codecFilter := make([]interface{}, len(codec))
|
||||||
|
for i, v := range codec {
|
||||||
|
codecFilter[i] = v
|
||||||
}
|
}
|
||||||
|
|
||||||
return videoInfo.Download(videoInfo.Formats.Best(ytdl.FormatAudioEncodingKey)[1], downloadedFile)
|
//Filter requires a [] interface{}
|
||||||
|
extFilter := make([]interface{}, len(ext))
|
||||||
|
for i, v := range ext {
|
||||||
|
extFilter[i] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
formats := videoInfo.Formats.Filter(ytdl.FormatVideoEncodingKey, codecFilter).Filter(ytdl.FormatExtensionKey, extFilter)
|
||||||
|
if len(formats) == 0 {
|
||||||
|
return errors.Err("no compatible format available for this video")
|
||||||
|
}
|
||||||
|
maxRetryAttempts := 5
|
||||||
|
for i := 0; i < len(formats) && i < maxRetryAttempts; i++ {
|
||||||
|
formatIndex := i
|
||||||
|
if i == maxRetryAttempts-1 {
|
||||||
|
formatIndex = len(formats) - 1
|
||||||
|
}
|
||||||
|
var downloadedFile *os.File
|
||||||
|
downloadedFile, err = os.Create(videoPath)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = videoInfo.Download(formats[formatIndex], downloadedFile)
|
||||||
|
downloadedFile.Close()
|
||||||
|
if err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
fi, err := os.Stat(v.getFilename())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
videoSize := fi.Size()
|
||||||
|
v.size = &videoSize
|
||||||
|
|
||||||
|
if videoSize > v.maxVideoSize {
|
||||||
|
//delete the video and ignore the error
|
||||||
|
_ = v.delete()
|
||||||
|
err = errors.Err("file is too big and there is no other format available")
|
||||||
|
} else {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *YoutubeVideo) videoDir() string {
|
func (v *YoutubeVideo) videoDir() string {
|
||||||
|
@ -214,6 +259,8 @@ func (v *YoutubeVideo) Size() *int64 {
|
||||||
func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) {
|
func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) {
|
||||||
v.claimNames = claimNames
|
v.claimNames = claimNames
|
||||||
v.syncedVideosMux = syncedVideosMux
|
v.syncedVideosMux = syncedVideosMux
|
||||||
|
v.maxVideoSize = int64(maxVideoSize) * 1024 * 1024
|
||||||
|
|
||||||
//download and thumbnail can be done in parallel
|
//download and thumbnail can be done in parallel
|
||||||
err := v.download()
|
err := v.download()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -221,19 +268,6 @@ func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount
|
||||||
}
|
}
|
||||||
log.Debugln("Downloaded " + v.id)
|
log.Debugln("Downloaded " + v.id)
|
||||||
|
|
||||||
fi, err := os.Stat(v.getFilename())
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
videoSize := fi.Size()
|
|
||||||
v.size = &videoSize
|
|
||||||
|
|
||||||
if videoSize > int64(maxVideoSize)*1024*1024 {
|
|
||||||
//delete the video and ignore the error
|
|
||||||
_ = v.delete()
|
|
||||||
return nil, errors.Err("the video is too big to sync, skipping for now")
|
|
||||||
}
|
|
||||||
|
|
||||||
err = v.triggerThumbnailSave()
|
err = v.triggerThumbnailSave()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Prefix("thumbnail error", err)
|
return nil, errors.Prefix("thumbnail error", err)
|
||||||
|
|
|
@ -27,7 +27,6 @@ import (
|
||||||
"github.com/lbryio/lbry.go/jsonrpc"
|
"github.com/lbryio/lbry.go/jsonrpc"
|
||||||
"github.com/lbryio/lbry.go/stop"
|
"github.com/lbryio/lbry.go/stop"
|
||||||
"github.com/lbryio/lbry.go/util"
|
"github.com/lbryio/lbry.go/util"
|
||||||
"github.com/lbryio/lbry.go/ytsync/redisdb"
|
|
||||||
"github.com/lbryio/lbry.go/ytsync/sources"
|
"github.com/lbryio/lbry.go/ytsync/sources"
|
||||||
"github.com/mitchellh/go-ps"
|
"github.com/mitchellh/go-ps"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
@ -77,7 +76,6 @@ type Sync struct {
|
||||||
daemon *jsonrpc.Client
|
daemon *jsonrpc.Client
|
||||||
claimAddress string
|
claimAddress string
|
||||||
videoDirectory string
|
videoDirectory string
|
||||||
db *redisdb.DB
|
|
||||||
syncedVideosMux *sync.RWMutex
|
syncedVideosMux *sync.RWMutex
|
||||||
syncedVideos map[string]syncedVideo
|
syncedVideos map[string]syncedVideo
|
||||||
claimNames map[string]bool
|
claimNames map[string]bool
|
||||||
|
@ -231,7 +229,6 @@ func (s *Sync) FullCycle() (e error) {
|
||||||
}
|
}
|
||||||
s.syncedVideosMux = &sync.RWMutex{}
|
s.syncedVideosMux = &sync.RWMutex{}
|
||||||
s.walletMux = &sync.Mutex{}
|
s.walletMux = &sync.Mutex{}
|
||||||
s.db = redisdb.New()
|
|
||||||
s.grp = stop.New()
|
s.grp = stop.New()
|
||||||
s.queue = make(chan video)
|
s.queue = make(chan video)
|
||||||
interruptChan := make(chan os.Signal, 1)
|
interruptChan := make(chan os.Signal, 1)
|
||||||
|
@ -426,10 +423,10 @@ func (s *Sync) doSync() error {
|
||||||
pubsOnDB++
|
pubsOnDB++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if pubsOnWallet > pubsOnDB {
|
//if pubsOnWallet > pubsOnDB {
|
||||||
SendInfoToSlack("We're claiming to have published %d videos but in reality we published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID)
|
// SendInfoToSlack("We're claiming to have published %d videos but in reality we published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID)
|
||||||
return errors.Err("not all published videos are in the database")
|
// return errors.Err("not all published videos are in the database")
|
||||||
}
|
// }
|
||||||
if pubsOnWallet < pubsOnDB {
|
if pubsOnWallet < pubsOnDB {
|
||||||
SendInfoToSlack("We're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID)
|
SendInfoToSlack("We're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID)
|
||||||
}
|
}
|
||||||
|
@ -716,19 +713,6 @@ func (s *Sync) processVideo(v video) (err error) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
//TODO: remove this after a few runs...
|
|
||||||
alreadyPublishedOld, err := s.db.IsPublished(v.ID())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
//TODO: remove this after a few runs...
|
|
||||||
if alreadyPublishedOld && !alreadyPublished {
|
|
||||||
//seems like something in the migration of blobs didn't go perfectly right so warn about it!
|
|
||||||
SendInfoToSlack("A video that was previously published is on the local database but isn't on the remote db! fix it @Nikooo777! \nchannelID: %s, videoID: %s",
|
|
||||||
s.YoutubeChannelID, v.ID())
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if alreadyPublished {
|
if alreadyPublished {
|
||||||
log.Println(v.ID() + " already published")
|
log.Println(v.ID() + " already published")
|
||||||
return nil
|
return nil
|
||||||
|
|
Loading…
Reference in a new issue