package ytsync import ( "bytes" "encoding/json" "io/ioutil" "net/http" "os" "os/exec" "sort" "strconv" "strings" "sync" "sync/atomic" "time" "github.com/lbryio/lbry.go/jsonrpc" "github.com/garyburd/redigo/redis" "github.com/go-errors/errors" ytdl "github.com/kkdai/youtube" "github.com/lbryio/lbry.go/lbrycrd" "github.com/shopspring/decimal" log "github.com/sirupsen/logrus" "google.golang.org/api/googleapi/transport" "google.golang.org/api/youtube/v3" ) const ( redisHashKey = "ytsync" redisSyncedVal = "t" channelClaimAmount = 0.01 publishAmount = 0.01 ) // Sync stores the options that control how syncing happens type Sync struct { YoutubeAPIKey string YoutubeChannelID string LbryChannelName string StopOnError bool MaxTries int ConcurrentVideos int TakeOverExistingChannel bool daemon *jsonrpc.Client claimAddress string videoDirectory string redisPool *redis.Pool } func (s *Sync) initDaemon() { if s.daemon == nil { s.daemon = jsonrpc.NewClient("") log.Infoln("Waiting for daemon to finish starting...") for { _, err := s.daemon.WalletBalance() if err == nil { break } time.Sleep(2 * time.Second) } } } func (s *Sync) init() error { var err error s.redisPool = &redis.Pool{ MaxIdle: 3, IdleTimeout: 5 * time.Minute, Dial: func() (redis.Conn, error) { return redis.Dial("tcp", ":6379") }, TestOnBorrow: func(c redis.Conn, t time.Time) error { if time.Since(t) < time.Minute { return nil } _, err := c.Do("PING") return err }, } s.videoDirectory, err = ioutil.TempDir("", "ytsync") if err != nil { return errors.Wrap(err, 0) } s.initDaemon() address, err := s.daemon.WalletUnusedAddress() if err != nil { return err } else if address == nil { return errors.New("could not get unused address") } s.claimAddress = string(*address) if s.claimAddress == "" { return errors.New("found blank claim address") } err = s.ensureEnoughUTXOs() if err != nil { return err } if s.LbryChannelName != "" { err = s.ensureChannelOwnership() if err != nil { return err } } return nil } func (s *Sync) CountVideos() (uint64, error) { client := &http.Client{ Transport: &transport.APIKey{Key: s.YoutubeAPIKey}, } service, err := youtube.New(client) if err != nil { return 0, errors.WrapPrefix(err, "error creating YouTube service", 0) } response, err := service.Channels.List("statistics").Id(s.YoutubeChannelID).Do() if err != nil { return 0, errors.WrapPrefix(err, "error getting channels", 0) } if len(response.Items) < 1 { return 0, errors.New("youtube channel not found") } return response.Items[0].Statistics.VideoCount, nil } func (s *Sync) FullCycle() error { if os.Getenv("HOME") == "" { return errors.New("no $HOME env var found") } newChannel := true lbryumDir := os.Getenv("HOME") + "/.lbryum" walletDir := os.Getenv("HOME") + "/wallets/" + strings.Replace(s.LbryChannelName, "@", "", 1) if _, err := os.Stat(walletDir); !os.IsNotExist(err) { newChannel = false err = os.Rename(walletDir, lbryumDir) if err != nil { return errors.Wrap(err, 0) } log.Println("Continuing previous upload") } err := s.startDaemonViaSystemd() if err != nil { return err } if newChannel { s.initDaemon() addressResp, err := s.daemon.WalletUnusedAddress() if err != nil { return err } else if addressResp == nil { return errors.New("no response") } address := string(*addressResp) count, err := s.CountVideos() if err != nil { return err } initialAmount := float64(count)*publishAmount + channelClaimAmount + 5 // +5 for fees and such log.Printf("Loading wallet with %f initial credits", initialAmount) lbrycrdd, err := lbrycrd.NewWithDefaultURL() if err != nil { return err } lbrycrdd.SimpleSend(address, initialAmount) //lbrycrdd.SendWithSplit(address, initialAmount, 50) wait := 15 * time.Second log.Println("Waiting " + wait.String() + " for lbryum to let us know we have the new transaction") time.Sleep(wait) log.Println("Waiting for transaction to be confirmed") err = s.waitUntilUTXOsConfirmed() if err != nil { return err } } err = s.Go() if err != nil { return err } // wait for reflection to finish??? wait := 15 * time.Second // should bump this up to a few min, but keeping it low for testing log.Println("Waiting " + wait.String() + " to finish reflecting everything") time.Sleep(wait) log.Printf("Stopping daemon") err = s.stopDaemonViaSystemd() if err != nil { return err } err = os.Rename(lbryumDir, walletDir) if err != nil { return errors.Wrap(err, 0) } return nil } func (s *Sync) Go() error { var err error err = s.init() if err != nil { return err } var wg sync.WaitGroup videoQueue := make(chan video) queueStopChan := make(chan struct{}) sendStopEnqueuing := sync.Once{} var videoErrored atomic.Value videoErrored.Store(false) if s.StopOnError { log.Println("Will stop publishing if an error is detected") } for i := 0; i < s.ConcurrentVideos; i++ { go func() { wg.Add(1) defer wg.Done() for { v, more := <-videoQueue if !more { return } if s.StopOnError && videoErrored.Load().(bool) { log.Println("Video errored. Exiting") return } log.Println("========================================") tryCount := 0 for { tryCount++ err := s.processVideo(v) if err != nil { log.Errorln("error processing video: " + err.Error()) if s.StopOnError { videoErrored.Store(true) sendStopEnqueuing.Do(func() { queueStopChan <- struct{}{} }) } else if s.MaxTries > 1 { if strings.Contains(err.Error(), "non 200 status code received") || strings.Contains(err.Error(), " reason: 'This video contains content from") { log.Println("This error should not be retried at all") } else if tryCount >= s.MaxTries { log.Println("Video failed after " + strconv.Itoa(s.MaxTries) + " retries, exiting") videoErrored.Store(true) sendStopEnqueuing.Do(func() { queueStopChan <- struct{}{} }) } else { log.Println("Retrying") continue } } } break } } }() } err = s.enqueueVideosFromChannel(s.YoutubeChannelID, &videoQueue, &queueStopChan) close(videoQueue) wg.Wait() return err } func (s *Sync) ensureEnoughUTXOs() error { utxolist, err := s.daemon.UTXOList() if err != nil { return err } else if utxolist == nil { return errors.New("no response") } target := 50 count := 0 for _, utxo := range *utxolist { if !utxo.IsClaim && !utxo.IsSupport && !utxo.IsUpdate && utxo.Amount.Cmp(decimal.New(0, 0)) == 1 { count++ } } if count < target { newAddresses := target - count balance, err := s.daemon.WalletBalance() if err != nil { return err } else if balance == nil { return errors.New("no response") } log.Println("balance is " + decimal.Decimal(*balance).String()) amountPerAddress := decimal.Decimal(*balance).Div(decimal.NewFromFloat(float64(target))) log.Infof("Putting %s credits into each of %d new addresses", amountPerAddress.String(), newAddresses) prefillTx, err := s.daemon.WalletPrefillAddresses(newAddresses, amountPerAddress, true) if err != nil { return err } else if prefillTx == nil { return errors.New("no response") } else if !prefillTx.Complete || !prefillTx.Broadcast { return errors.New("failed to prefill addresses") } wait := 15 * time.Second log.Println("Waiting " + wait.String() + " for lbryum to let us know we have the new addresses") time.Sleep(wait) log.Println("Creating UTXOs and waiting for them to be confirmed") err = s.waitUntilUTXOsConfirmed() if err != nil { return err } } return nil } func (s *Sync) waitUntilUTXOsConfirmed() error { for { r, err := s.daemon.UTXOList() if err != nil { return err } else if r == nil { return errors.New("no response") } allConfirmed := true if len(*r) < 1 { allConfirmed = false } else { for _, utxo := range *r { if utxo.Height == 0 { allConfirmed = false break } } } if allConfirmed { return nil } wait := 30 * time.Second log.Println("Waiting " + wait.String() + "...") time.Sleep(wait) } } func (s *Sync) ensureChannelOwnership() error { channels, err := s.daemon.ChannelListMine() if err != nil { return err } else if channels == nil { return errors.New("no channels") } for _, channel := range *channels { if channel.Name == s.LbryChannelName { return nil } } resolveResp, err := s.daemon.Resolve(s.LbryChannelName) if err != nil { return err } channel := (*resolveResp)[s.LbryChannelName] channelBidAmount := channelClaimAmount channelNotFound := channel.Error != nil && strings.Contains(*(channel.Error), "cannot be resolved") if !channelNotFound { if !s.TakeOverExistingChannel { return errors.New("Channel exists and we don't own it. Pick another channel.") } log.Println("Channel exists and we don't own it. Outbidding existing claim.") channelBidAmount, _ = channel.Certificate.Amount.Add(decimal.NewFromFloat(channelClaimAmount)).Float64() } _, err = s.daemon.ChannelNew(s.LbryChannelName, channelBidAmount) if err != nil { return err } // niko's code says "unfortunately the queues in the daemon are not yet merged so we must give it some time for the channel to go through" wait := 15 * time.Second log.Println("Waiting " + wait.String() + " for channel claim to go through") time.Sleep(wait) return nil } func (s *Sync) enqueueVideosFromChannel(channelID string, videoChan *chan video, queueStopChan *chan struct{}) error { client := &http.Client{ Transport: &transport.APIKey{Key: s.YoutubeAPIKey}, } service, err := youtube.New(client) if err != nil { return errors.WrapPrefix(err, "error creating YouTube service", 0) } response, err := service.Channels.List("contentDetails").Id(channelID).Do() if err != nil { return errors.WrapPrefix(err, "error getting channels", 0) } if len(response.Items) < 1 { return errors.New("youtube channel not found") } if response.Items[0].ContentDetails.RelatedPlaylists == nil { return errors.New("no related playlists") } playlistID := response.Items[0].ContentDetails.RelatedPlaylists.Uploads if playlistID == "" { return errors.New("no channel playlist") } videos := []video{} nextPageToken := "" for { req := service.PlaylistItems.List("snippet"). PlaylistId(playlistID). MaxResults(50). PageToken(nextPageToken) playlistResponse, err := req.Do() if err != nil { return errors.WrapPrefix(err, "error getting playlist items", 0) } if len(playlistResponse.Items) < 1 { return errors.New("playlist items not found") } for _, item := range playlistResponse.Items { // todo: there's thumbnail info here. why did we need lambda??? publishedAt, err := time.Parse(time.RFC3339Nano, item.Snippet.PublishedAt) if err != nil { return errors.WrapPrefix(err, "failed to parse time", 0) } // normally we'd send the video into the channel here, but youtube api doesn't have sorting // so we have to get ALL the videos, then sort them, then send them in videos = append(videos, video{ id: item.Snippet.ResourceId.VideoId, channelID: channelID, title: item.Snippet.Title, description: item.Snippet.Description, channelTitle: item.Snippet.ChannelTitle, playlistPosition: item.Snippet.Position, publishedAt: publishedAt, dir: s.videoDirectory, }) } log.Infoln("Got info for " + strconv.Itoa(len(videos)) + " videos from youtube API") nextPageToken = playlistResponse.NextPageToken if nextPageToken == "" { break } } sort.Sort(byPublishedAt(videos)) //or sort.Sort(sort.Reverse(byPlaylistPosition(videos))) Enqueue: for _, v := range videos { select { case *videoChan <- v: case <-*queueStopChan: break Enqueue } } return nil } func (s *Sync) processVideo(v video) error { log.Println("Processing " + v.id + " (" + strconv.Itoa(int(v.playlistPosition)) + " in channel)") defer func(start time.Time) { log.Println(v.id + " took " + time.Since(start).String()) }(time.Now()) conn := s.redisPool.Get() defer conn.Close() alreadyPublished, err := redis.String(conn.Do("HGET", redisHashKey, v.id)) if err != nil && err != redis.ErrNil { return errors.WrapPrefix(err, "redis error", 0) } if alreadyPublished == redisSyncedVal { log.Println(v.id + " already published") return nil } //download and thumbnail can be done in parallel err = downloadVideo(v) if err != nil { return errors.WrapPrefix(err, "download error", 0) } err = triggerThumbnailSave(v.id) if err != nil { return errors.WrapPrefix(err, "thumbnail error", 0) } err = s.publish(v, conn) if err != nil { return errors.WrapPrefix(err, "publish error", 0) } return nil } func downloadVideo(v video) error { verbose := false videoPath := v.getFilename() _, err := os.Stat(videoPath) if err != nil && !os.IsNotExist(err) { return err } else if err == nil { log.Println(v.id + " already exists at " + videoPath) return nil } downloader := ytdl.NewYoutube(verbose) err = downloader.DecodeURL("https://www.youtube.com/watch?v=" + v.id) if err != nil { return err } err = downloader.StartDownload(videoPath) if err != nil { return err } log.Debugln("Downloaded " + v.id) return nil } func triggerThumbnailSave(videoID string) error { client := &http.Client{Timeout: 30 * time.Second} params, err := json.Marshal(map[string]string{"videoid": videoID}) if err != nil { return err } request, err := http.NewRequest(http.MethodPut, "https://jgp4g1qoud.execute-api.us-east-1.amazonaws.com/prod/thumbnail", bytes.NewBuffer(params)) if err != nil { return err } response, err := client.Do(request) if err != nil { return err } defer response.Body.Close() contents, err := ioutil.ReadAll(response.Body) if err != nil { return err } var decoded struct { error int `json:"error"` url string `json:"url,omitempty"` message string `json:"message,omitempty"` } err = json.Unmarshal(contents, &decoded) if err != nil { return err } if decoded.error != 0 { return errors.New("error creating thumbnail: " + decoded.message) } log.Debugln("Created thumbnail for " + videoID) return nil } func strPtr(s string) *string { return &s } func (s *Sync) publish(v video, conn redis.Conn) error { options := jsonrpc.PublishOptions{ Title: &v.title, Author: &v.channelTitle, Description: strPtr(v.getAbbrevDescription() + "\nhttps://www.youtube.com/watch?v=" + v.id), Language: strPtr("en"), ClaimAddress: &s.claimAddress, Thumbnail: strPtr("http://berk.ninja/thumbnails/" + v.id), License: strPtr("Copyrighted (contact author)"), } if s.LbryChannelName != "" { options.ChannelName = &s.LbryChannelName } _, err := s.daemon.Publish(v.getClaimName(), v.getFilename(), publishAmount, options) if err != nil { return err } _, err = redis.Bool(conn.Do("HSET", redisHashKey, v.id, redisSyncedVal)) if err != nil { return errors.New("redis error: " + err.Error()) } return nil } func (s *Sync) startDaemonViaSystemd() error { err := exec.Command("/usr/bin/sudo", "/bin/systemctl", "start", "lbrynet.service").Run() if err != nil { return errors.New(err) } return nil } func (s *Sync) stopDaemonViaSystemd() error { err := exec.Command("/usr/bin/sudo", "/bin/systemctl", "stop", "lbrynet.service").Run() if err != nil { return errors.New(err) } return nil }