start-to-finish sync of a single channel

This commit is contained in:
Alex Grintsvayg 2017-11-03 08:46:27 -04:00
parent e674f8a215
commit 6a09ae5546

136
ytsync.go
View file

@ -6,6 +6,7 @@ import (
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"os" "os"
"os/exec"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
@ -18,6 +19,7 @@ import (
"github.com/garyburd/redigo/redis" "github.com/garyburd/redigo/redis"
"github.com/go-errors/errors" "github.com/go-errors/errors"
ytdl "github.com/kkdai/youtube" ytdl "github.com/kkdai/youtube"
"github.com/lbryio/lbry.go/lbrycrd"
"github.com/shopspring/decimal" "github.com/shopspring/decimal"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"google.golang.org/api/googleapi/transport" "google.golang.org/api/googleapi/transport"
@ -46,6 +48,20 @@ type Sync struct {
redisPool *redis.Pool redisPool *redis.Pool
} }
func (s *Sync) initDaemon() {
if s.daemon == nil {
s.daemon = jsonrpc.NewClient("")
log.Infoln("Waiting for daemon to finish starting...")
for {
_, err := s.daemon.WalletBalance()
if err == nil {
break
}
time.Sleep(2 * time.Second)
}
}
}
func (s *Sync) init() error { func (s *Sync) init() error {
var err error var err error
@ -67,15 +83,15 @@ func (s *Sync) init() error {
return errors.Wrap(err, 0) return errors.Wrap(err, 0)
} }
s.daemon = jsonrpc.NewClient("") s.initDaemon()
addresses, err := s.daemon.WalletList() address, err := s.daemon.WalletUnusedAddress()
if err != nil { if err != nil {
return err return err
} else if addresses == nil || len(*addresses) == 0 { } else if address == nil {
return errors.New("could not find an address in wallet") return errors.New("could not get unused address")
} }
s.claimAddress = (*addresses)[0] s.claimAddress = string(*address)
if s.claimAddress == "" { if s.claimAddress == "" {
return errors.New("found blank claim address") return errors.New("found blank claim address")
} }
@ -117,6 +133,81 @@ func (s *Sync) CountVideos() (uint64, error) {
return response.Items[0].Statistics.VideoCount, nil return response.Items[0].Statistics.VideoCount, nil
} }
func (s *Sync) FullCycle() error {
walletDir := os.Getenv("HOME") + "/wallets/" + strings.Replace(s.LbryChannelName, "@", "", 1)
if _, err := os.Stat(walletDir); !os.IsNotExist(err) {
return errors.New("channel has already been uploaded, and wallet is in " + walletDir)
}
err := s.startDaemonViaSystemd()
if err != nil {
return err
}
s.initDaemon()
addressResp, err := s.daemon.WalletUnusedAddress()
if err != nil {
return err
} else if addressResp == nil {
return errors.New("no response")
}
address := string(*addressResp)
count, err := s.CountVideos()
if err != nil {
return err
}
initialAmount := float64(count)*publishAmount + channelClaimAmount + 5 // +5 for fees and such
log.Printf("Loading wallet with %f initial credits", initialAmount)
lbrycrdd, err := lbrycrd.NewWithDefaultURL()
if err != nil {
return err
}
lbrycrdd.SimpleSend(address, initialAmount)
//lbrycrdd.SendWithSplit(address, initialAmount, 50)
wait := 15 * time.Second
log.Println("Waiting " + wait.String() + " for lbryum to let us know we have the new transaction")
time.Sleep(wait)
log.Println("Waiting for transaction to be confirmed")
err = s.waitUntilUTXOsConfirmed()
if err != nil {
return err
}
err = s.Go()
if err != nil {
return err
}
// wait for reflection to finish???
wait = 15 * time.Second // should bump this up to a few min, but keeping it low for testing
log.Println("Waiting " + wait.String() + " to finish reflecting everything")
time.Sleep(wait)
log.Printf("Stopping daemon")
err = s.stopDaemonViaSystemd()
if err != nil {
return err
}
// move wallet
if os.Getenv("HOME") == "" {
return errors.New("could not move lbryum dir - no $HOME var found")
}
lbryumDir := os.Getenv("HOME") + "/.lbryum"
err = os.Rename(lbryumDir, walletDir)
if err != nil {
return errors.Wrap(err, 0)
}
return nil
}
func (s *Sync) Go() error { func (s *Sync) Go() error {
var err error var err error
@ -221,6 +312,8 @@ func (s *Sync) ensureEnoughUTXOs() error {
return errors.New("no response") return errors.New("no response")
} }
log.Println("balance is " + decimal.Decimal(*balance).String())
amountPerAddress := decimal.Decimal(*balance).Div(decimal.NewFromFloat(float64(target))) amountPerAddress := decimal.Decimal(*balance).Div(decimal.NewFromFloat(float64(target)))
log.Infof("Putting %s credits into each of %d new addresses", amountPerAddress.String(), newAddresses) log.Infof("Putting %s credits into each of %d new addresses", amountPerAddress.String(), newAddresses)
prefillTx, err := s.daemon.WalletPrefillAddresses(newAddresses, amountPerAddress, true) prefillTx, err := s.daemon.WalletPrefillAddresses(newAddresses, amountPerAddress, true)
@ -237,7 +330,10 @@ func (s *Sync) ensureEnoughUTXOs() error {
time.Sleep(wait) time.Sleep(wait)
log.Println("Creating UTXOs and waiting for them to be confirmed") log.Println("Creating UTXOs and waiting for them to be confirmed")
s.waitUntilUTXOsConfirmed() err = s.waitUntilUTXOsConfirmed()
if err != nil {
return err
}
} }
return nil return nil
@ -253,10 +349,14 @@ func (s *Sync) waitUntilUTXOsConfirmed() error {
} }
allConfirmed := true allConfirmed := true
for _, utxo := range *r { if len(*r) < 1 {
if utxo.Height == 0 { allConfirmed = false
allConfirmed = false } else {
break for _, utxo := range *r {
if utxo.Height == 0 {
allConfirmed = false
break
}
} }
} }
@ -532,3 +632,19 @@ func (s *Sync) publish(v video, conn redis.Conn) error {
return nil return nil
} }
func (s *Sync) startDaemonViaSystemd() error {
err := exec.Command("/usr/bin/sudo", "/bin/systemctl", "start", "lbrynet.service").Run()
if err != nil {
return errors.New(err)
}
return nil
}
func (s *Sync) stopDaemonViaSystemd() error {
err := exec.Command("/usr/bin/sudo", "/bin/systemctl", "stop", "lbrynet.service").Run()
if err != nil {
return errors.New(err)
}
return nil
}