From 10f9cc1ba03863723acb71e1dc659c0684619239 Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Tue, 18 Sep 2018 17:28:25 -0400 Subject: [PATCH] completely remove redis db dependency improve download process --- redisdb/redisdb.go | 63 ------------------------------------ sources/youtubeVideo.go | 72 ++++++++++++++++++++++++++++++----------- ytsync.go | 24 +++----------- 3 files changed, 57 insertions(+), 102 deletions(-) delete mode 100644 redisdb/redisdb.go diff --git a/redisdb/redisdb.go b/redisdb/redisdb.go deleted file mode 100644 index 207d7c2..0000000 --- a/redisdb/redisdb.go +++ /dev/null @@ -1,63 +0,0 @@ -package redisdb - -import ( - "time" - - "github.com/lbryio/lbry.go/errors" - - "github.com/garyburd/redigo/redis" -) - -const ( - redisHashKey = "ytsync" - redisSyncedVal = "t" -) - -type DB struct { - pool *redis.Pool -} - -func New() *DB { - var r DB - r.pool = &redis.Pool{ - MaxIdle: 3, - IdleTimeout: 5 * time.Minute, - Dial: func() (redis.Conn, error) { return redis.Dial("tcp", ":6379") }, - TestOnBorrow: func(c redis.Conn, t time.Time) error { - if time.Since(t) < time.Minute { - return nil - } - _, err := c.Do("PING") - return err - }, - } - return &r -} - -func (r DB) IsPublished(id string) (bool, error) { - conn := r.pool.Get() - defer conn.Close() - - alreadyPublished, err := redis.String(conn.Do("HGET", redisHashKey, id)) - if err != nil && err != redis.ErrNil { - return false, errors.Prefix("redis error", err) - - } - - if alreadyPublished == redisSyncedVal { - return true, nil - } - - return false, nil -} - -func (r DB) SetPublished(id string) error { - conn := r.pool.Get() - defer conn.Close() - - _, err := redis.Bool(conn.Do("HSET", redisHashKey, id, redisSyncedVal)) - if err != nil { - return errors.Prefix("redis error", err) - } - return nil -} diff --git a/sources/youtubeVideo.go b/sources/youtubeVideo.go index 874a943..d67587d 100644 --- a/sources/youtubeVideo.go +++ b/sources/youtubeVideo.go @@ -28,6 +28,7 @@ type YoutubeVideo struct { description string playlistPosition int64 size *int64 + maxVideoSize int64 publishedAt time.Time dir string claimNames map[string]bool @@ -121,14 +122,58 @@ func (v *YoutubeVideo) download() error { return err } - var downloadedFile *os.File - downloadedFile, err = os.Create(videoPath) - defer downloadedFile.Close() - if err != nil { - return err + codec := []string{"H.264"} + ext := []string{"mp4"} + + //Filter requires a [] interface{} + codecFilter := make([]interface{}, len(codec)) + for i, v := range codec { + codecFilter[i] = v } - return videoInfo.Download(videoInfo.Formats.Best(ytdl.FormatAudioEncodingKey)[1], downloadedFile) + //Filter requires a [] interface{} + extFilter := make([]interface{}, len(ext)) + for i, v := range ext { + extFilter[i] = v + } + + formats := videoInfo.Formats.Filter(ytdl.FormatVideoEncodingKey, codecFilter).Filter(ytdl.FormatExtensionKey, extFilter) + if len(formats) == 0 { + return errors.Err("no compatible format available for this video") + } + maxRetryAttempts := 5 + for i := 0; i < len(formats) && i < maxRetryAttempts; i++ { + formatIndex := i + if i == maxRetryAttempts-1 { + formatIndex = len(formats) - 1 + } + var downloadedFile *os.File + downloadedFile, err = os.Create(videoPath) + if err != nil { + return err + } + err = videoInfo.Download(formats[formatIndex], downloadedFile) + downloadedFile.Close() + if err != nil { + break + } + fi, err := os.Stat(v.getFilename()) + if err != nil { + return err + } + videoSize := fi.Size() + v.size = &videoSize + + if videoSize > v.maxVideoSize { + //delete the video and ignore the error + _ = v.delete() + err = errors.Err("file is too big and there is no other format available") + } else { + break + } + } + + return err } func (v *YoutubeVideo) videoDir() string { @@ -214,6 +259,8 @@ func (v *YoutubeVideo) Size() *int64 { func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) { v.claimNames = claimNames v.syncedVideosMux = syncedVideosMux + v.maxVideoSize = int64(maxVideoSize) * 1024 * 1024 + //download and thumbnail can be done in parallel err := v.download() if err != nil { @@ -221,19 +268,6 @@ func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount } log.Debugln("Downloaded " + v.id) - fi, err := os.Stat(v.getFilename()) - if err != nil { - return nil, err - } - videoSize := fi.Size() - v.size = &videoSize - - if videoSize > int64(maxVideoSize)*1024*1024 { - //delete the video and ignore the error - _ = v.delete() - return nil, errors.Err("the video is too big to sync, skipping for now") - } - err = v.triggerThumbnailSave() if err != nil { return nil, errors.Prefix("thumbnail error", err) diff --git a/ytsync.go b/ytsync.go index 9048a60..17c645e 100644 --- a/ytsync.go +++ b/ytsync.go @@ -27,7 +27,6 @@ import ( "github.com/lbryio/lbry.go/jsonrpc" "github.com/lbryio/lbry.go/stop" "github.com/lbryio/lbry.go/util" - "github.com/lbryio/lbry.go/ytsync/redisdb" "github.com/lbryio/lbry.go/ytsync/sources" "github.com/mitchellh/go-ps" log "github.com/sirupsen/logrus" @@ -77,7 +76,6 @@ type Sync struct { daemon *jsonrpc.Client claimAddress string videoDirectory string - db *redisdb.DB syncedVideosMux *sync.RWMutex syncedVideos map[string]syncedVideo claimNames map[string]bool @@ -231,7 +229,6 @@ func (s *Sync) FullCycle() (e error) { } s.syncedVideosMux = &sync.RWMutex{} s.walletMux = &sync.Mutex{} - s.db = redisdb.New() s.grp = stop.New() s.queue = make(chan video) interruptChan := make(chan os.Signal, 1) @@ -426,10 +423,10 @@ func (s *Sync) doSync() error { pubsOnDB++ } } - if pubsOnWallet > pubsOnDB { - SendInfoToSlack("We're claiming to have published %d videos but in reality we published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID) - return errors.Err("not all published videos are in the database") - } + //if pubsOnWallet > pubsOnDB { + // SendInfoToSlack("We're claiming to have published %d videos but in reality we published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID) + // return errors.Err("not all published videos are in the database") + // } if pubsOnWallet < pubsOnDB { SendInfoToSlack("We're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID) } @@ -716,19 +713,6 @@ func (s *Sync) processVideo(v video) (err error) { return nil } - //TODO: remove this after a few runs... - alreadyPublishedOld, err := s.db.IsPublished(v.ID()) - if err != nil { - return err - } - //TODO: remove this after a few runs... - if alreadyPublishedOld && !alreadyPublished { - //seems like something in the migration of blobs didn't go perfectly right so warn about it! - SendInfoToSlack("A video that was previously published is on the local database but isn't on the remote db! fix it @Nikooo777! \nchannelID: %s, videoID: %s", - s.YoutubeChannelID, v.ID()) - return nil - } - if alreadyPublished { log.Println(v.ID() + " already published") return nil