ytsync/manager/setup.go
2021-04-12 23:31:37 +02:00

511 lines
14 KiB
Go

package manager
import (
"fmt"
"math"
"strconv"
"strings"
"time"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/jsonrpc"
"github.com/lbryio/lbry.go/v2/extras/util"
"github.com/lbryio/ytsync/v5/shared"
"github.com/lbryio/ytsync/v5/timing"
logUtils "github.com/lbryio/ytsync/v5/util"
"github.com/lbryio/ytsync/v5/ytapi"
"github.com/lbryio/ytsync/v5/tags_manager"
"github.com/lbryio/ytsync/v5/thumbs"
"github.com/shopspring/decimal"
log "github.com/sirupsen/logrus"
)
func (s *Sync) enableAddressReuse() error {
accountsResponse, err := s.daemon.AccountList(1, 50)
if err != nil {
return errors.Err(err)
}
accounts := make([]jsonrpc.Account, 0, len(accountsResponse.Items))
ledger := "lbc_mainnet"
if logUtils.IsRegTest() {
ledger = "lbc_regtest"
}
for _, a := range accountsResponse.Items {
if *a.Ledger == ledger {
accounts = append(accounts, a)
}
}
for _, a := range accounts {
_, err = s.daemon.AccountSet(a.ID, jsonrpc.AccountSettings{
ChangeMaxUses: util.PtrToInt(1000),
ReceivingMaxUses: util.PtrToInt(100),
})
if err != nil {
return errors.Err(err)
}
}
return nil
}
func (s *Sync) walletSetup() error {
start := time.Now()
defer func(start time.Time) {
timing.TimedComponent("walletSetup").Add(time.Since(start))
}(start)
//prevent unnecessary concurrent execution and publishing while refilling/reallocating UTXOs
s.walletMux.Lock()
defer s.walletMux.Unlock()
err := s.ensureChannelOwnership()
if err != nil {
return err
}
balanceResp, err := s.daemon.AccountBalance(nil)
if err != nil {
return err
} else if balanceResp == nil {
return errors.Err("no response")
}
balance, err := strconv.ParseFloat(balanceResp.Available.String(), 64)
if err != nil {
return errors.Err(err)
}
log.Debugf("Starting balance is %.4f", balance)
videosOnYoutube := int(s.DbChannelData.TotalVideos)
log.Debugf("Source channel has %d videos", videosOnYoutube)
if videosOnYoutube == 0 {
return nil
}
s.syncedVideosMux.RLock()
publishedCount := 0
notUpgradedCount := 0
failedCount := 0
for _, sv := range s.syncedVideos {
if sv.Published {
publishedCount++
if sv.MetadataVersion < 2 {
notUpgradedCount++
}
} else {
failedCount++
}
}
s.syncedVideosMux.RUnlock()
log.Debugf("We already allocated credits for %d published videos and %d failed videos", publishedCount, failedCount)
if videosOnYoutube > s.Manager.CliFlags.VideosToSync(s.DbChannelData.TotalSubscribers) {
videosOnYoutube = s.Manager.CliFlags.VideosToSync(s.DbChannelData.TotalSubscribers)
}
unallocatedVideos := videosOnYoutube - (publishedCount + failedCount)
channelFee := channelClaimAmount
channelAlreadyClaimed := s.DbChannelData.ChannelClaimID != ""
if channelAlreadyClaimed {
channelFee = 0.0
}
requiredBalance := float64(unallocatedVideos)*(publishAmount+estimatedMaxTxFee) + channelFee
if s.Manager.CliFlags.UpgradeMetadata {
requiredBalance += float64(notUpgradedCount) * 0.001
}
refillAmount := 0.0
if balance < requiredBalance || balance < minimumAccountBalance {
refillAmount = math.Max(math.Max(requiredBalance-balance, minimumAccountBalance-balance), minimumRefillAmount)
}
if s.Manager.CliFlags.Refill > 0 {
refillAmount += float64(s.Manager.CliFlags.Refill)
}
if refillAmount > 0 {
err := s.addCredits(refillAmount)
if err != nil {
return errors.Err(err)
}
}
claimAddress, err := s.daemon.AddressList(nil, nil, 1, 20)
if err != nil {
return err
} else if claimAddress == nil {
return errors.Err("could not get an address")
}
if s.DbChannelData.PublishAddress.Address == "" || !s.shouldTransfer() {
s.DbChannelData.PublishAddress.Address = string(claimAddress.Items[0].Address)
s.DbChannelData.PublishAddress.IsMine = true
}
if s.DbChannelData.PublishAddress.Address == "" {
return errors.Err("found blank claim address")
}
err = s.ensureEnoughUTXOs()
if err != nil {
return err
}
return nil
}
func (s *Sync) getDefaultAccount() (string, error) {
start := time.Now()
defer func(start time.Time) {
timing.TimedComponent("getDefaultAccount").Add(time.Since(start))
}(start)
if s.defaultAccountID == "" {
accountsResponse, err := s.daemon.AccountList(1, 50)
if err != nil {
return "", errors.Err(err)
}
ledger := "lbc_mainnet"
if logUtils.IsRegTest() {
ledger = "lbc_regtest"
}
for _, a := range accountsResponse.Items {
if *a.Ledger == ledger {
if a.IsDefault {
s.defaultAccountID = a.ID
break
}
}
}
if s.defaultAccountID == "" {
return "", errors.Err("No default account found")
}
}
return s.defaultAccountID, nil
}
func (s *Sync) ensureEnoughUTXOs() error {
start := time.Now()
defer func(start time.Time) {
timing.TimedComponent("ensureEnoughUTXOs").Add(time.Since(start))
}(start)
defaultAccount, err := s.getDefaultAccount()
if err != nil {
return err
}
utxolist, err := s.daemon.UTXOList(&defaultAccount, 1, 10000)
if err != nil {
return err
} else if utxolist == nil {
return errors.Err("no response")
}
target := 40
slack := int(float32(0.1) * float32(target))
count := 0
confirmedCount := 0
for _, utxo := range utxolist.Items {
amount, _ := strconv.ParseFloat(utxo.Amount, 64)
if utxo.IsMyOutput && utxo.Type == "payment" && amount > 0.001 {
if utxo.Confirmations > 0 {
confirmedCount++
}
count++
}
}
log.Infof("utxo count: %d (%d confirmed)", count, confirmedCount)
UTXOWaitThreshold := 16
if count < target-slack {
balance, err := s.daemon.AccountBalance(&defaultAccount)
if err != nil {
return err
} else if balance == nil {
return errors.Err("no response")
}
balanceAmount, err := strconv.ParseFloat(balance.Available.String(), 64)
if err != nil {
return errors.Err(err)
}
//this is dumb but sometimes the balance is negative and it breaks everything, so let's check again
if balanceAmount < 0 {
log.Infof("negative balance of %.2f found. Waiting to retry...", balanceAmount)
time.Sleep(10 * time.Second)
balanceAmount, err = strconv.ParseFloat(balance.Available.String(), 64)
if err != nil {
return errors.Err(err)
}
}
maxUTXOs := uint64(500)
desiredUTXOCount := uint64(math.Floor((balanceAmount) / 0.1))
if desiredUTXOCount > maxUTXOs {
desiredUTXOCount = maxUTXOs
}
if desiredUTXOCount < uint64(confirmedCount) {
return nil
}
availableBalance, _ := balance.Available.Float64()
log.Infof("Splitting balance of %.3f evenly between %d UTXOs", availableBalance, desiredUTXOCount)
broadcastFee := 0.1
prefillTx, err := s.daemon.AccountFund(defaultAccount, defaultAccount, fmt.Sprintf("%.4f", balanceAmount-broadcastFee), desiredUTXOCount, false)
if err != nil {
return err
} else if prefillTx == nil {
return errors.Err("no response")
}
if confirmedCount < UTXOWaitThreshold {
err = s.waitForNewBlock()
if err != nil {
return err
}
}
} else if confirmedCount < UTXOWaitThreshold {
log.Println("Waiting for previous txns to confirm")
err := s.waitForNewBlock()
if err != nil {
return err
}
}
return nil
}
func (s *Sync) waitForNewBlock() error {
defer func(start time.Time) { timing.TimedComponent("waitForNewBlock").Add(time.Since(start)) }(time.Now())
log.Printf("regtest: %t, docker: %t", logUtils.IsRegTest(), logUtils.IsUsingDocker())
status, err := s.daemon.Status()
if err != nil {
return err
}
for status.Wallet.Blocks == 0 || status.Wallet.BlocksBehind != 0 {
time.Sleep(5 * time.Second)
status, err = s.daemon.Status()
if err != nil {
return err
}
}
currentBlock := status.Wallet.Blocks
for i := 0; status.Wallet.Blocks <= currentBlock; i++ {
if i%3 == 0 {
log.Printf("Waiting for new block (%d)...", currentBlock+1)
}
if logUtils.IsRegTest() && logUtils.IsUsingDocker() {
err = s.GenerateRegtestBlock()
if err != nil {
return err
}
}
time.Sleep(10 * time.Second)
status, err = s.daemon.Status()
if err != nil {
return err
}
}
time.Sleep(5 * time.Second)
return nil
}
func (s *Sync) GenerateRegtestBlock() error {
lbrycrd, err := logUtils.GetLbrycrdClient(s.Manager.LbrycrdDsn)
if err != nil {
return errors.Prefix("error getting lbrycrd client: ", err)
}
txs, err := lbrycrd.Generate(1)
if err != nil {
return errors.Prefix("error generating new block: ", err)
}
for _, tx := range txs {
log.Info("Generated tx: ", tx.String())
}
return nil
}
func (s *Sync) ensureChannelOwnership() error {
defer func(start time.Time) { timing.TimedComponent("ensureChannelOwnership").Add(time.Since(start)) }(time.Now())
if s.DbChannelData.DesiredChannelName == "" {
return errors.Err("no channel name set")
}
channels, err := s.daemon.ChannelList(nil, 1, 500, nil, false)
if err != nil {
return err
} else if channels == nil {
return errors.Err("no channel response")
}
var channelToUse *jsonrpc.Transaction
if len((*channels).Items) > 0 {
if s.DbChannelData.ChannelClaimID == "" {
return errors.Err("this channel does not have a recorded claimID in the database. To prevent failures, updates are not supported until an entry is manually added in the database")
}
for _, c := range (*channels).Items {
log.Debugf("checking listed channel %s (%s)", c.ClaimID, c.Name)
if c.ClaimID != s.DbChannelData.ChannelClaimID {
continue
}
if c.Name != s.DbChannelData.DesiredChannelName {
return errors.Err("the channel in the wallet is different than the channel in the database")
}
channelToUse = &c
break
}
if channelToUse == nil {
return errors.Err("this wallet has channels but not a single one is ours! Expected claim_id: %s (%s)", s.DbChannelData.ChannelClaimID, s.DbChannelData.DesiredChannelName)
}
} else if s.DbChannelData.TransferState == shared.TransferStateComplete {
return errors.Err("the channel was transferred but appears to have been abandoned!")
} else if s.DbChannelData.ChannelClaimID != "" {
return errors.Err("the database has a channel recorded (%s) but nothing was found in our control", s.DbChannelData.ChannelClaimID)
}
channelUsesOldMetadata := false
if channelToUse != nil {
channelUsesOldMetadata = channelToUse.Value.GetThumbnail() == nil
if !channelUsesOldMetadata {
return nil
}
}
channelBidAmount := channelClaimAmount
balanceResp, err := s.daemon.AccountBalance(nil)
if err != nil {
return err
} else if balanceResp == nil {
return errors.Err("no response")
}
balance, err := decimal.NewFromString(balanceResp.Available.String())
if err != nil {
return errors.Err(err)
}
if balance.LessThan(decimal.NewFromFloat(channelBidAmount)) {
err = s.addCredits(channelBidAmount + 0.3)
if err != nil {
return err
}
}
channelInfo, err := ytapi.ChannelInfo(s.DbChannelData.ChannelId)
if err != nil {
if strings.Contains(err.Error(), "invalid character 'e' looking for beginning of value") {
logUtils.SendInfoToSlack("failed to get channel data for %s. Waiting 1 minute to retry", s.DbChannelData.ChannelId)
time.Sleep(1 * time.Minute)
channelInfo, err = ytapi.ChannelInfo(s.DbChannelData.ChannelId)
if err != nil {
return err
}
} else {
return err
}
}
thumbnail := channelInfo.Header.C4TabbedHeaderRenderer.Avatar.Thumbnails[len(channelInfo.Header.C4TabbedHeaderRenderer.Avatar.Thumbnails)-1].URL
thumbnailURL, err := thumbs.MirrorThumbnail(thumbnail, s.DbChannelData.ChannelId, *s.Manager.AwsConfigs.GetS3AWSConfig())
if err != nil {
return err
}
var bannerURL *string
if channelInfo.Header.C4TabbedHeaderRenderer.Banner.Thumbnails != nil {
bURL, err := thumbs.MirrorThumbnail(channelInfo.Header.C4TabbedHeaderRenderer.Banner.Thumbnails[len(channelInfo.Header.C4TabbedHeaderRenderer.Banner.Thumbnails)-1].URL,
"banner-"+s.DbChannelData.ChannelId,
*s.Manager.AwsConfigs.GetS3AWSConfig(),
)
if err != nil {
return err
}
bannerURL = &bURL
}
var languages []string = nil
//we don't have this data without the API
//if channelInfo.DefaultLanguage != "" {
// if channelInfo.DefaultLanguage == "iw" {
// channelInfo.DefaultLanguage = "he"
// }
// languages = []string{channelInfo.DefaultLanguage}
//}
var locations []jsonrpc.Location = nil
if channelInfo.Topbar.DesktopTopbarRenderer.CountryCode != "" {
locations = []jsonrpc.Location{{Country: &channelInfo.Topbar.DesktopTopbarRenderer.CountryCode}}
}
var c *jsonrpc.TransactionSummary
claimCreateOptions := jsonrpc.ClaimCreateOptions{
Title: &channelInfo.Microformat.MicroformatDataRenderer.Title,
Description: &channelInfo.Metadata.ChannelMetadataRenderer.Description,
Tags: tags_manager.GetTagsForChannel(s.DbChannelData.ChannelId),
Languages: languages,
Locations: locations,
ThumbnailURL: &thumbnailURL,
}
if channelUsesOldMetadata {
if s.DbChannelData.TransferState <= 1 {
c, err = s.daemon.ChannelUpdate(s.DbChannelData.ChannelClaimID, jsonrpc.ChannelUpdateOptions{
ClearTags: util.PtrToBool(true),
ClearLocations: util.PtrToBool(true),
ClearLanguages: util.PtrToBool(true),
ChannelCreateOptions: jsonrpc.ChannelCreateOptions{
ClaimCreateOptions: claimCreateOptions,
CoverURL: bannerURL,
},
})
} else {
logUtils.SendInfoToSlack("%s (%s) has a channel with old metadata but isn't in our control anymore. Ignoring", s.DbChannelData.DesiredChannelName, s.DbChannelData.ChannelClaimID)
return nil
}
} else {
c, err = s.daemon.ChannelCreate(s.DbChannelData.DesiredChannelName, channelBidAmount, jsonrpc.ChannelCreateOptions{
ClaimCreateOptions: claimCreateOptions,
CoverURL: bannerURL,
})
}
if err != nil {
return err
}
s.DbChannelData.ChannelClaimID = c.Outputs[0].ClaimID
return s.Manager.ApiConfig.SetChannelClaimID(s.DbChannelData.ChannelId, s.DbChannelData.ChannelClaimID)
}
func (s *Sync) addCredits(amountToAdd float64) error {
start := time.Now()
defer func(start time.Time) {
timing.TimedComponent("addCredits").Add(time.Since(start))
}(start)
log.Printf("Adding %f credits", amountToAdd)
lbrycrdd, err := logUtils.GetLbrycrdClient(s.Manager.LbrycrdDsn)
if err != nil {
return err
}
defaultAccount, err := s.getDefaultAccount()
if err != nil {
return err
}
addressResp, err := s.daemon.AddressUnused(&defaultAccount)
if err != nil {
return err
} else if addressResp == nil {
return errors.Err("no response")
}
address := string(*addressResp)
_, err = lbrycrdd.SimpleSend(address, amountToAdd)
if err != nil {
return err
}
wait := 15 * time.Second
log.Println("Waiting " + wait.String() + " for lbryum to let us know we have the new transaction")
time.Sleep(wait)
return nil
}