diff --git a/ytsync.go b/ytsync.go index 075fb76..94e0170 100644 --- a/ytsync.go +++ b/ytsync.go @@ -18,6 +18,7 @@ import ( "github.com/garyburd/redigo/redis" "github.com/go-errors/errors" ytdl "github.com/kkdai/youtube" + "github.com/shopspring/decimal" log "github.com/sirupsen/logrus" "google.golang.org/api/googleapi/transport" "google.golang.org/api/youtube/v3" @@ -63,7 +64,7 @@ func (s *Sync) init() error { s.videoDirectory, err = ioutil.TempDir("", "ytsync") if err != nil { - return err + return errors.Wrap(err, 0) } s.daemon = jsonrpc.NewClient("") @@ -79,6 +80,11 @@ func (s *Sync) init() error { return errors.New("found blank claim address") } + err = s.ensureEnoughUTXOs() + if err != nil { + return err + } + if s.LbryChannelName != "" { err = s.ensureChannelOwnership() if err != nil { @@ -89,6 +95,28 @@ func (s *Sync) init() error { return nil } +func (s *Sync) CountVideos() (uint64, error) { + client := &http.Client{ + Transport: &transport.APIKey{Key: s.YoutubeAPIKey}, + } + + service, err := youtube.New(client) + if err != nil { + return 0, errors.WrapPrefix(err, "error creating YouTube service", 0) + } + + response, err := service.Channels.List("statistics").Id(s.YoutubeChannelID).Do() + if err != nil { + return 0, errors.WrapPrefix(err, "error getting channels", 0) + } + + if len(response.Items) < 1 { + return 0, errors.New("youtube channel not found") + } + + return response.Items[0].Statistics.VideoCount, nil +} + func (s *Sync) Go() error { var err error @@ -143,7 +171,11 @@ func (s *Sync) Go() error { strings.Contains(err.Error(), " reason: 'This video contains content from") { log.Println("This error should not be retried at all") } else if tryCount >= s.MaxTries { - log.Println("Video failed after " + strconv.Itoa(s.MaxTries) + " retries, moving on") + log.Println("Video failed after " + strconv.Itoa(s.MaxTries) + " retries, exiting") + videoErrored.Store(true) + sendStopEnqueuing.Do(func() { + queueStopChan <- struct{}{} + }) } else { log.Println("Retrying") continue @@ -162,6 +194,82 @@ func (s *Sync) Go() error { return err } +func (s *Sync) ensureEnoughUTXOs() error { + utxolist, err := s.daemon.UTXOList() + if err != nil { + return err + } else if utxolist == nil { + return errors.New("no response") + } + + target := 50 + count := 0 + + for _, utxo := range *utxolist { + if !utxo.IsClaim && !utxo.IsSupport && !utxo.IsUpdate && utxo.Amount.Cmp(decimal.New(0, 0)) == 1 { + count++ + } + } + + if count < target { + newAddresses := target - count + + balance, err := s.daemon.WalletBalance() + if err != nil { + return err + } else if balance == nil { + return errors.New("no response") + } + + amountPerAddress := decimal.Decimal(*balance).Div(decimal.NewFromFloat(float64(target))) + log.Infof("Putting %s credits into each of %d new addresses", amountPerAddress.String(), newAddresses) + prefillTx, err := s.daemon.WalletPrefillAddresses(newAddresses, amountPerAddress, true) + if err != nil { + return err + } else if prefillTx == nil { + return errors.New("no response") + } else if !prefillTx.Complete || !prefillTx.Broadcast { + return errors.New("failed to prefill addresses") + } + + wait := 15 * time.Second + log.Println("Waiting " + wait.String() + " for lbryum to let us know we have the new addresses") + time.Sleep(wait) + + log.Println("Creating UTXOs and waiting for them to be confirmed") + s.waitUntilUTXOsConfirmed() + } + + return nil +} + +func (s *Sync) waitUntilUTXOsConfirmed() error { + for { + r, err := s.daemon.UTXOList() + if err != nil { + return err + } else if r == nil { + return errors.New("no response") + } + + allConfirmed := true + for _, utxo := range *r { + if utxo.Height == 0 { + allConfirmed = false + break + } + } + + if allConfirmed { + return nil + } + + wait := 30 * time.Second + log.Println("Waiting " + wait.String() + "...") + time.Sleep(wait) + } +} + func (s *Sync) ensureChannelOwnership() error { channels, err := s.daemon.ChannelListMine() if err != nil { @@ -292,6 +400,9 @@ Enqueue: func (s *Sync) processVideo(v video) error { log.Println("Processing " + v.id + " (" + strconv.Itoa(int(v.playlistPosition)) + " in channel)") + defer func(start time.Time) { + log.Println(v.id + " took " + time.Since(start).String()) + }(time.Now()) conn := s.redisPool.Get() defer conn.Close()