From 039ed1a7eb4cc9b060d704c54eb303a727d79235 Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Thu, 28 Dec 2017 12:14:33 -0500 Subject: [PATCH] big refactor, abort on ctrl-c, proper wallet init and backup --- README.md | 9 +- count.go | 31 ++ redisdb/redisdb.go | 62 ++++ setup.go | 257 ++++++++++++++++ sources/youtubeVideo.go | 212 +++++++++++++ video.go | 69 ----- ytsync.go | 640 +++++++++------------------------------- 7 files changed, 713 insertions(+), 567 deletions(-) create mode 100644 count.go create mode 100644 redisdb/redisdb.go create mode 100644 setup.go create mode 100644 sources/youtubeVideo.go delete mode 100644 video.go diff --git a/README.md b/README.md index 7f2fc39..7d6ba8d 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,10 @@ -# Current YT Sync Process +# YT Sync Process -- make sure you have a clean `.lbryum` dir (delete existing dir if there's nothing you need there) +- make sure you don't have a `.lbryum/wallets/default_wallet` + - delete existing wallet if there's nothing you need there, or better yet, move it somewhere else in case you need it later - make sure daemon is stopped and can be controlled with `systemctl` - run `lbry ytsync YOUTUBE_KEY YOUTUBE_CHANNEL_ID LBRY_CHANNEL_NAME --max-tries=5` - - `max-tries` will retry errors that you will undoubtedly get + - `max-tries` will retry errors that you will probably get (e.g. failed publishes) - after sync is complete, daemon will be stopped and wallet will be moved to `~/wallets/` - now mark content as synced in doc @@ -14,4 +15,4 @@ content that was put on Youtube since the last sync. Add this to cron to delete synced videos that have been published: -`*/10 * * * * /usr/bin/find /tmp/ ! -readable -prune -o -name '*ytsync*' -mmin +20 -print0 | xargs -0 --no-run-if-empty rm -r` +`*/10 * * * * (/bin/ls /tmp/ | /bin/grep -q ytsync && /usr/bin/find /tmp/ytsync* -mmin +20 -delete) || true diff --git a/count.go b/count.go new file mode 100644 index 0000000..83a5000 --- /dev/null +++ b/count.go @@ -0,0 +1,31 @@ +package ytsync + +import ( + "net/http" + + "github.com/go-errors/errors" + "google.golang.org/api/googleapi/transport" + "google.golang.org/api/youtube/v3" +) + +func (s *Sync) CountVideos() (uint64, error) { + client := &http.Client{ + Transport: &transport.APIKey{Key: s.YoutubeAPIKey}, + } + + service, err := youtube.New(client) + if err != nil { + return 0, errors.WrapPrefix(err, "error creating YouTube service", 0) + } + + response, err := service.Channels.List("statistics").Id(s.YoutubeChannelID).Do() + if err != nil { + return 0, errors.WrapPrefix(err, "error getting channels", 0) + } + + if len(response.Items) < 1 { + return 0, errors.New("youtube channel not found") + } + + return response.Items[0].Statistics.VideoCount, nil +} diff --git a/redisdb/redisdb.go b/redisdb/redisdb.go new file mode 100644 index 0000000..d9187b6 --- /dev/null +++ b/redisdb/redisdb.go @@ -0,0 +1,62 @@ +package redisdb + +import ( + "time" + + "github.com/garyburd/redigo/redis" + "github.com/go-errors/errors" +) + +const ( + redisHashKey = "ytsync" + redisSyncedVal = "t" +) + +type DB struct { + pool *redis.Pool +} + +func New() *DB { + var r DB + r.pool = &redis.Pool{ + MaxIdle: 3, + IdleTimeout: 5 * time.Minute, + Dial: func() (redis.Conn, error) { return redis.Dial("tcp", ":6379") }, + TestOnBorrow: func(c redis.Conn, t time.Time) error { + if time.Since(t) < time.Minute { + return nil + } + _, err := c.Do("PING") + return err + }, + } + return &r +} + +func (r DB) IsPublished(id string) (bool, error) { + conn := r.pool.Get() + defer conn.Close() + + alreadyPublished, err := redis.String(conn.Do("HGET", redisHashKey, id)) + if err != nil && err != redis.ErrNil { + return false, errors.WrapPrefix(err, "redis error", 0) + + } + + if alreadyPublished == redisSyncedVal { + return true, nil + } + + return false, nil +} + +func (r DB) SetPublished(id string) error { + conn := r.pool.Get() + defer conn.Close() + + _, err := redis.Bool(conn.Do("HSET", redisHashKey, id, redisSyncedVal)) + if err != nil { + return errors.New("redis error: " + err.Error()) + } + return nil +} diff --git a/setup.go b/setup.go new file mode 100644 index 0000000..8fb652f --- /dev/null +++ b/setup.go @@ -0,0 +1,257 @@ +package ytsync + +import ( + "strings" + "time" + + "github.com/lbryio/lbry.go/jsonrpc" + "github.com/lbryio/lbry.go/lbrycrd" + + "github.com/go-errors/errors" + "github.com/shopspring/decimal" + log "github.com/sirupsen/logrus" +) + +func (s *Sync) walletSetup() error { + balanceResp, err := s.daemon.WalletBalance() + if err != nil { + return err + } else if balanceResp == nil { + return errors.New("no response") + } + balance := decimal.Decimal(*balanceResp) + log.Debugf("Starting balance is %s", balance.String()) + + numOnSource, err := s.CountVideos() + if err != nil { + return err + } + log.Debugf("Source channel has %d videos", numOnSource) + + numPublished := uint64(0) + if s.LbryChannelName != "" { + numPublished, err = s.daemon.NumClaimsInChannel(s.LbryChannelName) + if err != nil { + return err + } + } + log.Debugf("We already published %d videos", numPublished) + + minBalance := (float64(numOnSource)-float64(numPublished))*publishAmount + channelClaimAmount + amountToAdd, _ := decimal.NewFromFloat(minBalance).Sub(balance).Float64() + + if amountToAdd > 0 { + addressResp, err := s.daemon.WalletUnusedAddress() + if err != nil { + return err + } else if addressResp == nil { + return errors.New("no response") + } + address := string(*addressResp) + + amountToAdd *= 1.5 // add 50% margin for fees, future publishes, etc + log.Printf("Adding %f credits", amountToAdd) + lbrycrdd, err := lbrycrd.NewWithDefaultURL() + if err != nil { + return err + } + + _, err = lbrycrdd.SimpleSend(address, amountToAdd) + if err != nil { + return err + } + + wait := 15 * time.Second + log.Println("Waiting " + wait.String() + " for lbryum to let us know we have the new transaction") + time.Sleep(wait) + + log.Println("Waiting for transaction to be confirmed") + err = s.waitUntilUTXOsConfirmed() + if err != nil { + return err + } + } + + claimAddress, err := s.daemon.WalletUnusedAddress() + if err != nil { + return err + } else if claimAddress == nil { + return errors.New("could not get unused address") + } + s.claimAddress = string(*claimAddress) + if s.claimAddress == "" { + return errors.New("found blank claim address") + } + + err = s.ensureEnoughUTXOs() + if err != nil { + return err + } + + if s.LbryChannelName != "" { + err = s.ensureChannelOwnership() + if err != nil { + return err + } + } + + balanceResp, err = s.daemon.WalletBalance() + if err != nil { + return err + } else if balanceResp == nil { + return errors.New("no response") + } + log.Println("starting with " + decimal.Decimal(*balanceResp).String() + "LBC") + + return nil +} + +func (s *Sync) ensureEnoughUTXOs() error { + utxolist, err := s.daemon.UTXOList() + if err != nil { + return err + } else if utxolist == nil { + return errors.New("no response") + } + + if !allUTXOsConfirmed(utxolist) { + log.Println("Waiting for previous txns to confirm") // happens if you restarted the daemon soon after a previous publish run + s.waitUntilUTXOsConfirmed() + } + + target := 50 + count := 0 + + for _, utxo := range *utxolist { + if !utxo.IsClaim && !utxo.IsSupport && !utxo.IsUpdate && utxo.Amount.Cmp(decimal.New(0, 0)) == 1 { + count++ + } + } + + if count < target { + newAddresses := target - count + + balance, err := s.daemon.WalletBalance() + if err != nil { + return err + } else if balance == nil { + return errors.New("no response") + } + + log.Println("balance is " + decimal.Decimal(*balance).String()) + + amountPerAddress := decimal.Decimal(*balance).Div(decimal.NewFromFloat(float64(target))) + log.Infof("Putting %s credits into each of %d new addresses", amountPerAddress.String(), newAddresses) + prefillTx, err := s.daemon.WalletPrefillAddresses(newAddresses, amountPerAddress, true) + if err != nil { + return err + } else if prefillTx == nil { + return errors.New("no response") + } else if !prefillTx.Complete || !prefillTx.Broadcast { + return errors.New("failed to prefill addresses") + } + + wait := 15 * time.Second + log.Println("Waiting " + wait.String() + " for lbryum to let us know we have the new addresses") + time.Sleep(wait) + + log.Println("Creating UTXOs and waiting for them to be confirmed") + err = s.waitUntilUTXOsConfirmed() + if err != nil { + return err + } + } + + return nil +} + +func (s *Sync) waitUntilUTXOsConfirmed() error { + for { + r, err := s.daemon.UTXOList() + if err != nil { + return err + } else if r == nil { + return errors.New("no response") + } + + if allUTXOsConfirmed(r) { + return nil + } + + wait := 30 * time.Second + log.Println("Waiting " + wait.String() + "...") + time.Sleep(wait) + } +} + +func (s *Sync) ensureChannelOwnership() error { + if s.LbryChannelName == "" { + return errors.New("no channel name set") + } + + channels, err := s.daemon.ChannelListMine() + if err != nil { + return err + } else if channels == nil { + return errors.New("no channel response") + } + + isChannelMine := false + for _, channel := range *channels { + if channel.Name == s.LbryChannelName { + isChannelMine = true + } else { + return errors.New("this wallet has multiple channels. maybe something went wrong during setup?") + } + } + if isChannelMine { + return nil + } + + resolveResp, err := s.daemon.Resolve(s.LbryChannelName) + if err != nil { + return err + } + + channel := (*resolveResp)[s.LbryChannelName] + channelBidAmount := channelClaimAmount + + channelNotFound := channel.Error != nil && strings.Contains(*(channel.Error), "cannot be resolved") + if !channelNotFound { + if !s.TakeOverExistingChannel { + return errors.New("Channel exists and we don't own it. Pick another channel.") + } + log.Println("Channel exists and we don't own it. Outbidding existing claim.") + channelBidAmount, _ = channel.Certificate.Amount.Add(decimal.NewFromFloat(channelClaimAmount)).Float64() + } + + _, err = s.daemon.ChannelNew(s.LbryChannelName, channelBidAmount) + if err != nil { + return err + } + + // niko's code says "unfortunately the queues in the daemon are not yet merged so we must give it some time for the channel to go through" + wait := 15 * time.Second + log.Println("Waiting " + wait.String() + " for channel claim to go through") + time.Sleep(wait) + + return nil +} + +func allUTXOsConfirmed(utxolist *jsonrpc.UTXOListResponse) bool { + if utxolist == nil { + return false + } + + if len(*utxolist) < 1 { + return false + } + + for _, utxo := range *utxolist { + if utxo.Height == 0 { + return false + } + } + + return true +} diff --git a/sources/youtubeVideo.go b/sources/youtubeVideo.go new file mode 100644 index 0000000..88e19c9 --- /dev/null +++ b/sources/youtubeVideo.go @@ -0,0 +1,212 @@ +package sources + +import ( + "bytes" + "encoding/json" + "io/ioutil" + "net/http" + "os" + "regexp" + "strconv" + "strings" + "time" + + "github.com/lbryio/lbry.go/jsonrpc" + + "github.com/go-errors/errors" + ytdl "github.com/kkdai/youtube" + log "github.com/sirupsen/logrus" + "google.golang.org/api/youtube/v3" +) + +type YoutubeVideo struct { + id string + channelTitle string + title string + description string + playlistPosition int64 + publishedAt time.Time + dir string +} + +func NewYoutubeVideo(directory string, snippet *youtube.PlaylistItemSnippet) YoutubeVideo { + publishedAt, _ := time.Parse(time.RFC3339Nano, snippet.PublishedAt) // ignore parse errors + return YoutubeVideo{ + id: snippet.ResourceId.VideoId, + title: snippet.Title, + description: snippet.Description, + channelTitle: snippet.ChannelTitle, + playlistPosition: snippet.Position, + publishedAt: publishedAt, + dir: directory, + } +} + +func (v YoutubeVideo) ID() string { + return v.id +} + +func (v YoutubeVideo) IDAndNum() string { + return v.ID() + " (" + strconv.Itoa(int(v.playlistPosition)) + " in channel)" +} + +func (v YoutubeVideo) PublishedAt() time.Time { + return v.publishedAt +} + +func (v YoutubeVideo) getFilename() string { + return v.dir + "/" + v.id + ".mp4" +} + +func (v YoutubeVideo) getClaimName() string { + maxLen := 40 + reg := regexp.MustCompile(`[^a-zA-Z0-9]+`) + + chunks := strings.Split(strings.ToLower(strings.Trim(reg.ReplaceAllString(v.title, "-"), "-")), "-") + + name := chunks[0] + if len(name) > maxLen { + return name[:maxLen] + } + + for _, chunk := range chunks[1:] { + tmpName := name + "-" + chunk + if len(tmpName) > maxLen { + if len(name) < 20 { + name = tmpName[:maxLen] + } + break + } + name = tmpName + } + + return name +} + +func (v YoutubeVideo) getAbbrevDescription() string { + maxLines := 10 + description := strings.TrimSpace(v.description) + if strings.Count(description, "\n") < maxLines { + return description + } + return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..." +} + +func (v YoutubeVideo) Download() error { + videoPath := v.getFilename() + + _, err := os.Stat(videoPath) + if err != nil && !os.IsNotExist(err) { + return err + } else if err == nil { + log.Debugln(v.id + " already exists at " + videoPath) + return nil + } + + downloader := ytdl.NewYoutube(false) + err = downloader.DecodeURL("https://www.youtube.com/watch?v=" + v.id) + if err != nil { + return err + } + err = downloader.StartDownload(videoPath) + if err != nil { + return err + } + return nil +} + +func (v YoutubeVideo) TriggerThumbnailSave() error { + client := &http.Client{Timeout: 30 * time.Second} + + params, err := json.Marshal(map[string]string{"videoid": v.id}) + if err != nil { + return err + } + + request, err := http.NewRequest(http.MethodPut, "https://jgp4g1qoud.execute-api.us-east-1.amazonaws.com/prod/thumbnail", bytes.NewBuffer(params)) + if err != nil { + return err + } + + response, err := client.Do(request) + if err != nil { + return err + } + defer response.Body.Close() + + contents, err := ioutil.ReadAll(response.Body) + if err != nil { + return err + } + + var decoded struct { + error int `json:"error"` + url string `json:"url,omitempty"` + message string `json:"message,omitempty"` + } + err = json.Unmarshal(contents, &decoded) + if err != nil { + return err + } + + if decoded.error != 0 { + return errors.New("error creating thumbnail: " + decoded.message) + } + + return nil +} + +func strPtr(s string) *string { return &s } + +func (v YoutubeVideo) Publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) error { + options := jsonrpc.PublishOptions{ + Title: &v.title, + Author: &v.channelTitle, + Description: strPtr(v.getAbbrevDescription() + "\nhttps://www.youtube.com/watch?v=" + v.id), + Language: strPtr("en"), + ClaimAddress: &claimAddress, + Thumbnail: strPtr("http://berk.ninja/thumbnails/" + v.id), + License: strPtr("Copyrighted (contact author)"), + } + if channelName != "" { + options.ChannelName = &channelName + } + + _, err := daemon.Publish(v.getClaimName(), v.getFilename(), amount, options) + return err +} + +func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) error { + //download and thumbnail can be done in parallel + err := v.Download() + if err != nil { + return errors.WrapPrefix(err, "download error", 0) + } + log.Debugln("Downloaded " + v.id) + + err = v.TriggerThumbnailSave() + if err != nil { + return errors.WrapPrefix(err, "thumbnail error", 0) + } + log.Debugln("Created thumbnail for " + v.id) + + err = v.Publish(daemon, claimAddress, amount, channelName) + if err != nil { + return errors.WrapPrefix(err, "publish error", 0) + } + + return nil +} + +// sorting videos +//type ByPublishedAt []YoutubeVideo +// +//func (a ByPublishedAt) Len() int { return len(a) } +//func (a ByPublishedAt) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +//func (a ByPublishedAt) Less(i, j int) bool { return a[i].publishedAt.Before(a[j].publishedAt) } +// +//type ByPlaylistPosition []YoutubeVideo +// +//func (a ByPlaylistPosition) Len() int { return len(a) } +//func (a ByPlaylistPosition) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +//func (a ByPlaylistPosition) Less(i, j int) bool { return a[i].playlistPosition < a[j].playlistPosition } diff --git a/video.go b/video.go deleted file mode 100644 index b7622a0..0000000 --- a/video.go +++ /dev/null @@ -1,69 +0,0 @@ -package ytsync - -import ( - "regexp" - "strings" - "time" -) - -type video struct { - id string - channelID string - channelTitle string - title string - description string - playlistPosition int64 - publishedAt time.Time - dir string -} - -func (v video) getFilename() string { - return v.dir + "/" + v.id + ".mp4" -} - -func (v video) getClaimName() string { - maxLen := 40 - reg := regexp.MustCompile(`[^a-zA-Z0-9]+`) - - chunks := strings.Split(strings.ToLower(strings.Trim(reg.ReplaceAllString(v.title, "-"), "-")), "-") - - name := chunks[0] - if len(name) > maxLen { - return name[:maxLen] - } - - for _, chunk := range chunks[1:] { - tmpName := name + "-" + chunk - if len(tmpName) > maxLen { - if len(name) < 20 { - name = tmpName[:maxLen] - } - break - } - name = tmpName - } - - return name -} - -func (v video) getAbbrevDescription() string { - maxLines := 10 - description := strings.TrimSpace(v.description) - if strings.Count(description, "\n") < maxLines { - return description - } - return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..." -} - -// sorting videos -type byPublishedAt []video - -func (a byPublishedAt) Len() int { return len(a) } -func (a byPublishedAt) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a byPublishedAt) Less(i, j int) bool { return a[i].publishedAt.Before(a[j].publishedAt) } - -type byPlaylistPosition []video - -func (a byPlaylistPosition) Len() int { return len(a) } -func (a byPlaylistPosition) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a byPlaylistPosition) Less(i, j int) bool { return a[i].playlistPosition < a[j].playlistPosition } diff --git a/ytsync.go b/ytsync.go index 237abc9..9056da3 100644 --- a/ytsync.go +++ b/ytsync.go @@ -1,38 +1,47 @@ package ytsync import ( - "bytes" - "encoding/json" "io/ioutil" "net/http" "os" "os/exec" + "os/signal" "sort" - "strconv" "strings" "sync" - "sync/atomic" + "syscall" "time" "github.com/lbryio/lbry.go/jsonrpc" + "github.com/lbryio/lbry.go/stopOnce" + "github.com/lbryio/lbry.go/ytsync/redisdb" + "github.com/lbryio/lbry.go/ytsync/sources" - "github.com/garyburd/redigo/redis" "github.com/go-errors/errors" - ytdl "github.com/kkdai/youtube" - "github.com/lbryio/lbry.go/lbrycrd" - "github.com/shopspring/decimal" log "github.com/sirupsen/logrus" "google.golang.org/api/googleapi/transport" "google.golang.org/api/youtube/v3" ) const ( - redisHashKey = "ytsync" - redisSyncedVal = "t" channelClaimAmount = 0.01 publishAmount = 0.01 ) +type video interface { + ID() string + IDAndNum() string + PublishedAt() time.Time + Sync(*jsonrpc.Client, string, float64, string) error +} + +// sorting videos +type byPublishedAt []video + +func (a byPublishedAt) Len() int { return len(a) } +func (a byPublishedAt) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a byPublishedAt) Less(i, j int) bool { return a[i].PublishedAt().Before(a[j].PublishedAt()) } + // Sync stores the options that control how syncing happens type Sync struct { YoutubeAPIKey string @@ -46,117 +55,28 @@ type Sync struct { daemon *jsonrpc.Client claimAddress string videoDirectory string - redisPool *redis.Pool -} + db *redisdb.DB -func (s *Sync) initDaemon() { - if s.daemon == nil { - s.daemon = jsonrpc.NewClient("") - log.Infoln("Waiting for daemon to finish starting...") - for { - _, err := s.daemon.WalletBalance() - if err == nil { - break - } - time.Sleep(5 * time.Second) - } - } -} + stop *stopOnce.Stopper -func (s *Sync) init() error { - var err error - - s.redisPool = &redis.Pool{ - MaxIdle: 3, - IdleTimeout: 5 * time.Minute, - Dial: func() (redis.Conn, error) { return redis.Dial("tcp", ":6379") }, - TestOnBorrow: func(c redis.Conn, t time.Time) error { - if time.Since(t) < time.Minute { - return nil - } - _, err := c.Do("PING") - return err - }, - } - - s.videoDirectory, err = ioutil.TempDir("", "ytsync") - if err != nil { - return errors.Wrap(err, 0) - } - - s.initDaemon() - - address, err := s.daemon.WalletUnusedAddress() - if err != nil { - return err - } else if address == nil { - return errors.New("could not get unused address") - } - s.claimAddress = string(*address) - if s.claimAddress == "" { - return errors.New("found blank claim address") - } - - err = s.ensureEnoughUTXOs() - if err != nil { - return err - } - - if s.LbryChannelName != "" { - err = s.ensureChannelOwnership() - if err != nil { - return err - } - } - - balance, err := s.daemon.WalletBalance() - if err != nil { - return err - } else if balance == nil { - return errors.New("no response") - } - log.Println("starting with " + decimal.Decimal(*balance).String() + "LBC") - - return nil -} - -func (s *Sync) CountVideos() (uint64, error) { - client := &http.Client{ - Transport: &transport.APIKey{Key: s.YoutubeAPIKey}, - } - - service, err := youtube.New(client) - if err != nil { - return 0, errors.WrapPrefix(err, "error creating YouTube service", 0) - } - - response, err := service.Channels.List("statistics").Id(s.YoutubeChannelID).Do() - if err != nil { - return 0, errors.WrapPrefix(err, "error getting channels", 0) - } - - if len(response.Items) < 1 { - return 0, errors.New("youtube channel not found") - } - - return response.Items[0].Statistics.VideoCount, nil + wg sync.WaitGroup + queue chan video } func (s *Sync) FullCycle() error { + var err error if os.Getenv("HOME") == "" { return errors.New("no $HOME env var found") } - newChannel := true defaultWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/default_wallet" walletBackupDir := os.Getenv("HOME") + "/wallets/" + strings.Replace(s.LbryChannelName, "@", "", 1) - if _, err := os.Stat(walletBackupDir); !os.IsNotExist(err) { + if _, err = os.Stat(walletBackupDir); !os.IsNotExist(err) { if _, err := os.Stat(defaultWalletDir); !os.IsNotExist(err) { return errors.New("Tried to continue previous upload, but default_wallet already exists") } - newChannel = false err = os.Rename(walletBackupDir, defaultWalletDir) if err != nil { return errors.Wrap(err, 0) @@ -164,291 +84,136 @@ func (s *Sync) FullCycle() error { log.Println("Continuing previous upload") } - err := s.startDaemonViaSystemd() - if err != nil { - return err - } - - if newChannel { - s.initDaemon() - - addressResp, err := s.daemon.WalletUnusedAddress() - if err != nil { - return err - } else if addressResp == nil { - return errors.New("no response") + defer func() { + log.Printf("Stopping daemon") + shutdownErr := stopDaemonViaSystemd() + if shutdownErr != nil { + log.Errorf("error shutting down daemon: %v", shutdownErr) + log.Errorf("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR", shutdownErr) + } else { + walletErr := os.Rename(defaultWalletDir, walletBackupDir) + if walletErr != nil { + log.Errorf("error moving wallet to backup dir: %v", walletErr) + } } - address := string(*addressResp) + }() - count, err := s.CountVideos() - if err != nil { - return err - } - initialAmount := float64(count)*publishAmount + channelClaimAmount - initialAmount += initialAmount * 0.1 // add 10% margin for fees, etc - - log.Printf("Loading wallet with %f initial credits", initialAmount) - lbrycrdd, err := lbrycrd.NewWithDefaultURL() - if err != nil { - return err - } - lbrycrdd.SimpleSend(address, initialAmount) - //lbrycrdd.SendWithSplit(address, initialAmount, 50) - - wait := 15 * time.Second - log.Println("Waiting " + wait.String() + " for lbryum to let us know we have the new transaction") - time.Sleep(wait) - - log.Println("Waiting for transaction to be confirmed") - err = s.waitUntilUTXOsConfirmed() - if err != nil { - return err - } - } - - err = s.Go() - if err != nil { - return err - } - - // wait for reflection to finish??? - wait := 15 * time.Second // should bump this up to a few min, but keeping it low for testing - log.Println("Waiting " + wait.String() + " to finish reflecting everything") - time.Sleep(wait) - - log.Printf("Stopping daemon") - err = s.stopDaemonViaSystemd() - if err != nil { - return err - } - - err = os.Rename(defaultWalletDir, walletBackupDir) + s.videoDirectory, err = ioutil.TempDir("", "ytsync") if err != nil { return errors.Wrap(err, 0) } + s.db = redisdb.New() + s.stop = stopOnce.New() + s.queue = make(chan video) + + interruptChan := make(chan os.Signal, 1) + signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) + go func() { + <-interruptChan + s.stop.Stop() + }() + + log.Printf("Starting daemon") + err = startDaemonViaSystemd() + if err != nil { + return err + } + + log.Infoln("Waiting for daemon to finish starting...") + s.daemon = jsonrpc.NewClientAndWait("") + + err = s.doSync() + if err != nil { + return err + } else { + // wait for reflection to finish??? + wait := 15 * time.Second // should bump this up to a few min, but keeping it low for testing + log.Println("Waiting " + wait.String() + " to finish reflecting everything") + time.Sleep(wait) + } + return nil } -func (s *Sync) Go() error { +func (s *Sync) doSync() error { var err error - err = s.init() + err = s.walletSetup() if err != nil { return err } - var wg sync.WaitGroup - videoQueue := make(chan video) - - queueStopChan := make(chan struct{}) - sendStopEnqueuing := sync.Once{} - - var videoErrored atomic.Value - videoErrored.Store(false) if s.StopOnError { log.Println("Will stop publishing if an error is detected") } for i := 0; i < s.ConcurrentVideos; i++ { - go func() { - wg.Add(1) - defer wg.Done() - - for { - v, more := <-videoQueue - if !more { - return - } - if s.StopOnError && videoErrored.Load().(bool) { - log.Println("Video errored. Exiting") - return - } - - log.Println("========================================") - - tryCount := 0 - for { - tryCount++ - err := s.processVideo(v) - - if err != nil { - log.Errorln("error processing video: " + err.Error()) - if s.StopOnError { - videoErrored.Store(true) - sendStopEnqueuing.Do(func() { - queueStopChan <- struct{}{} - }) - } else if s.MaxTries > 1 { - if strings.Contains(err.Error(), "non 200 status code received") || - strings.Contains(err.Error(), " reason: 'This video contains content from") || - strings.Contains(err.Error(), "Playback on other websites has been disabled by the video owner") { - log.Println("This error should not be retried at all") - } else if tryCount >= s.MaxTries { - log.Println("Video failed after " + strconv.Itoa(s.MaxTries) + " retries, exiting") - videoErrored.Store(true) - sendStopEnqueuing.Do(func() { - queueStopChan <- struct{}{} - }) - } else { - log.Println("Retrying") - continue - } - } - } - break - } - } - }() + go s.startWorker(i) } - err = s.enqueueVideosFromChannel(s.YoutubeChannelID, &videoQueue, &queueStopChan) - close(videoQueue) - wg.Wait() + err = s.enqueueVideos() + close(s.queue) + s.wg.Wait() return err } -func allUTXOsConfirmed(utxolist *jsonrpc.UTXOListResponse) bool { - if utxolist == nil { - return false - } +func (s *Sync) startWorker(workerNum int) { + s.wg.Add(1) + defer s.wg.Done() - if len(*utxolist) < 1 { - return false - } else { - for _, utxo := range *utxolist { - if utxo.Height == 0 { - return false - } - } - } + var v video + var more bool - return true -} - -func (s *Sync) ensureEnoughUTXOs() error { - utxolist, err := s.daemon.UTXOList() - if err != nil { - return err - } else if utxolist == nil { - return errors.New("no response") - } - - if !allUTXOsConfirmed(utxolist) { - log.Println("Waiting for previous txns to confirm") // happens if you restarted the daemon soon after a previous publish run - s.waitUntilUTXOsConfirmed() - } - - target := 50 - count := 0 - - for _, utxo := range *utxolist { - if !utxo.IsClaim && !utxo.IsSupport && !utxo.IsUpdate && utxo.Amount.Cmp(decimal.New(0, 0)) == 1 { - count++ - } - } - - if count < target { - newAddresses := target - count - - balance, err := s.daemon.WalletBalance() - if err != nil { - return err - } else if balance == nil { - return errors.New("no response") - } - - log.Println("balance is " + decimal.Decimal(*balance).String()) - - amountPerAddress := decimal.Decimal(*balance).Div(decimal.NewFromFloat(float64(target))) - log.Infof("Putting %s credits into each of %d new addresses", amountPerAddress.String(), newAddresses) - prefillTx, err := s.daemon.WalletPrefillAddresses(newAddresses, amountPerAddress, true) - if err != nil { - return err - } else if prefillTx == nil { - return errors.New("no response") - } else if !prefillTx.Complete || !prefillTx.Broadcast { - return errors.New("failed to prefill addresses") - } - - wait := 15 * time.Second - log.Println("Waiting " + wait.String() + " for lbryum to let us know we have the new addresses") - time.Sleep(wait) - - log.Println("Creating UTXOs and waiting for them to be confirmed") - err = s.waitUntilUTXOsConfirmed() - if err != nil { - return err - } - } - - return nil -} - -func (s *Sync) waitUntilUTXOsConfirmed() error { for { - r, err := s.daemon.UTXOList() - if err != nil { - return err - } else if r == nil { - return errors.New("no response") + select { + case <-s.stop.Chan(): + log.Printf("Stopping worker %d", workerNum) + return + default: } - if allUTXOsConfirmed(r) { - return nil + select { + case v, more = <-s.queue: + if !more { + return + } + case <-s.stop.Chan(): + log.Printf("Stopping worker %d", workerNum) + return } - wait := 30 * time.Second - log.Println("Waiting " + wait.String() + "...") - time.Sleep(wait) + log.Println("========================================") + + tryCount := 0 + for { + tryCount++ + err := s.processVideo(v) + + if err != nil { + log.Errorln("error processing video: " + err.Error()) + if s.StopOnError { + s.stop.Stop() + } else if s.MaxTries > 1 { + if strings.Contains(err.Error(), "non 200 status code received") || + strings.Contains(err.Error(), " reason: 'This video contains content from") || + strings.Contains(err.Error(), "Playback on other websites has been disabled by the video owner") { + log.Println("This error should not be retried at all") + } else if tryCount >= s.MaxTries { + log.Printf("Video failed after %d retries, exiting", s.MaxTries) + s.stop.Stop() + } else { + log.Println("Retrying") + continue + } + } + } + break + } } } -func (s *Sync) ensureChannelOwnership() error { - channels, err := s.daemon.ChannelListMine() - if err != nil { - return err - } else if channels == nil { - return errors.New("no channels") - } - - for _, channel := range *channels { - if channel.Name == s.LbryChannelName { - return nil - } - } - - resolveResp, err := s.daemon.Resolve(s.LbryChannelName) - if err != nil { - return err - } - - channel := (*resolveResp)[s.LbryChannelName] - channelBidAmount := channelClaimAmount - - channelNotFound := channel.Error != nil && strings.Contains(*(channel.Error), "cannot be resolved") - if !channelNotFound { - if !s.TakeOverExistingChannel { - return errors.New("Channel exists and we don't own it. Pick another channel.") - } - log.Println("Channel exists and we don't own it. Outbidding existing claim.") - channelBidAmount, _ = channel.Certificate.Amount.Add(decimal.NewFromFloat(channelClaimAmount)).Float64() - } - - _, err = s.daemon.ChannelNew(s.LbryChannelName, channelBidAmount) - if err != nil { - return err - } - - // niko's code says "unfortunately the queues in the daemon are not yet merged so we must give it some time for the channel to go through" - wait := 15 * time.Second - log.Println("Waiting " + wait.String() + " for channel claim to go through") - time.Sleep(wait) - - return nil -} - -func (s *Sync) enqueueVideosFromChannel(channelID string, videoChan *chan video, queueStopChan *chan struct{}) error { +func (s *Sync) enqueueVideos() error { client := &http.Client{ Transport: &transport.APIKey{Key: s.YoutubeAPIKey}, } @@ -458,7 +223,7 @@ func (s *Sync) enqueueVideosFromChannel(channelID string, videoChan *chan video, return errors.WrapPrefix(err, "error creating YouTube service", 0) } - response, err := service.Channels.List("contentDetails").Id(channelID).Do() + response, err := service.Channels.List("contentDetails").Id(s.YoutubeChannelID).Do() if err != nil { return errors.WrapPrefix(err, "error getting channels", 0) } @@ -476,7 +241,7 @@ func (s *Sync) enqueueVideosFromChannel(channelID string, videoChan *chan video, return errors.New("no channel playlist") } - videos := []video{} + var videos []video nextPageToken := "" for { @@ -496,26 +261,13 @@ func (s *Sync) enqueueVideosFromChannel(channelID string, videoChan *chan video, for _, item := range playlistResponse.Items { // todo: there's thumbnail info here. why did we need lambda??? - publishedAt, err := time.Parse(time.RFC3339Nano, item.Snippet.PublishedAt) - if err != nil { - return errors.WrapPrefix(err, "failed to parse time", 0) - } // normally we'd send the video into the channel here, but youtube api doesn't have sorting // so we have to get ALL the videos, then sort them, then send them in - videos = append(videos, video{ - id: item.Snippet.ResourceId.VideoId, - channelID: channelID, - title: item.Snippet.Title, - description: item.Snippet.Description, - channelTitle: item.Snippet.ChannelTitle, - playlistPosition: item.Snippet.Position, - publishedAt: publishedAt, - dir: s.videoDirectory, - }) + videos = append(videos, sources.NewYoutubeVideo(s.videoDirectory, item.Snippet)) } - log.Infoln("Got info for " + strconv.Itoa(len(videos)) + " videos from youtube API") + log.Infof("Got info for %d videos from youtube API", len(videos)) nextPageToken = playlistResponse.NextPageToken if nextPageToken == "" { @@ -529,8 +281,14 @@ func (s *Sync) enqueueVideosFromChannel(channelID string, videoChan *chan video, Enqueue: for _, v := range videos { select { - case *videoChan <- v: - case <-*queueStopChan: + case <-s.stop.Chan(): + break Enqueue + default: + } + + select { + case s.queue <- v: + case <-s.stop.Chan(): break Enqueue } } @@ -539,141 +297,35 @@ Enqueue: } func (s *Sync) processVideo(v video) error { - log.Println("Processing " + v.id + " (" + strconv.Itoa(int(v.playlistPosition)) + " in channel)") + log.Println("Processing " + v.IDAndNum()) defer func(start time.Time) { - log.Println(v.id + " took " + time.Since(start).String()) + log.Println(v.ID() + " took " + time.Since(start).String()) }(time.Now()) - conn := s.redisPool.Get() - defer conn.Close() - - alreadyPublished, err := redis.String(conn.Do("HGET", redisHashKey, v.id)) - if err != nil && err != redis.ErrNil { - return errors.WrapPrefix(err, "redis error", 0) - + alreadyPublished, err := s.db.IsPublished(v.ID()) + if err != nil { + return err } - if alreadyPublished == redisSyncedVal { - log.Println(v.id + " already published") + + if alreadyPublished { + log.Println(v.ID() + " already published") return nil } - //download and thumbnail can be done in parallel - err = downloadVideo(v) + err = v.Sync(s.daemon, s.claimAddress, publishAmount, s.LbryChannelName) if err != nil { - return errors.WrapPrefix(err, "download error", 0) + return err } - err = triggerThumbnailSave(v.id) + err = s.db.SetPublished(v.ID()) if err != nil { - return errors.WrapPrefix(err, "thumbnail error", 0) - } - - err = s.publish(v, conn) - if err != nil { - return errors.WrapPrefix(err, "publish error", 0) + return err } return nil } -func downloadVideo(v video) error { - verbose := false - videoPath := v.getFilename() - - _, err := os.Stat(videoPath) - if err != nil && !os.IsNotExist(err) { - return err - } else if err == nil { - log.Println(v.id + " already exists at " + videoPath) - return nil - } - - downloader := ytdl.NewYoutube(verbose) - err = downloader.DecodeURL("https://www.youtube.com/watch?v=" + v.id) - if err != nil { - return err - } - err = downloader.StartDownload(videoPath) - if err != nil { - return err - } - log.Debugln("Downloaded " + v.id) - return nil -} - -func triggerThumbnailSave(videoID string) error { - client := &http.Client{Timeout: 30 * time.Second} - - params, err := json.Marshal(map[string]string{"videoid": videoID}) - if err != nil { - return err - } - - request, err := http.NewRequest(http.MethodPut, "https://jgp4g1qoud.execute-api.us-east-1.amazonaws.com/prod/thumbnail", bytes.NewBuffer(params)) - if err != nil { - return err - } - - response, err := client.Do(request) - if err != nil { - return err - } - defer response.Body.Close() - - contents, err := ioutil.ReadAll(response.Body) - if err != nil { - return err - } - - var decoded struct { - error int `json:"error"` - url string `json:"url,omitempty"` - message string `json:"message,omitempty"` - } - err = json.Unmarshal(contents, &decoded) - if err != nil { - return err - } - - if decoded.error != 0 { - return errors.New("error creating thumbnail: " + decoded.message) - } - - log.Debugln("Created thumbnail for " + videoID) - - return nil -} - -func strPtr(s string) *string { return &s } - -func (s *Sync) publish(v video, conn redis.Conn) error { - options := jsonrpc.PublishOptions{ - Title: &v.title, - Author: &v.channelTitle, - Description: strPtr(v.getAbbrevDescription() + "\nhttps://www.youtube.com/watch?v=" + v.id), - Language: strPtr("en"), - ClaimAddress: &s.claimAddress, - Thumbnail: strPtr("http://berk.ninja/thumbnails/" + v.id), - License: strPtr("Copyrighted (contact author)"), - } - if s.LbryChannelName != "" { - options.ChannelName = &s.LbryChannelName - } - - _, err := s.daemon.Publish(v.getClaimName(), v.getFilename(), publishAmount, options) - if err != nil { - return err - } - - _, err = redis.Bool(conn.Do("HSET", redisHashKey, v.id, redisSyncedVal)) - if err != nil { - return errors.New("redis error: " + err.Error()) - } - - return nil -} - -func (s *Sync) startDaemonViaSystemd() error { +func startDaemonViaSystemd() error { err := exec.Command("/usr/bin/sudo", "/bin/systemctl", "start", "lbrynet.service").Run() if err != nil { return errors.New(err) @@ -681,7 +333,7 @@ func (s *Sync) startDaemonViaSystemd() error { return nil } -func (s *Sync) stopDaemonViaSystemd() error { +func stopDaemonViaSystemd() error { err := exec.Command("/usr/bin/sudo", "/bin/systemctl", "stop", "lbrynet.service").Run() if err != nil { return errors.New(err)