346 lines
8.8 KiB
Go
346 lines
8.8 KiB
Go
package manager
|
|
|
|
import (
|
|
"fmt"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
|
"github.com/lbryio/lbry.go/v2/extras/jsonrpc"
|
|
"github.com/lbryio/lbry.go/v2/extras/stop"
|
|
"github.com/lbryio/lbry.go/v2/extras/util"
|
|
"github.com/lbryio/ytsync/v5/sdk"
|
|
"github.com/lbryio/ytsync/v5/timing"
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
)
|
|
|
|
func waitConfirmations(s *Sync) error {
|
|
start := time.Now()
|
|
defer func(start time.Time) {
|
|
timing.TimedComponent("waitConfirmations").Add(time.Since(start))
|
|
}(start)
|
|
defaultAccount, err := s.getDefaultAccount()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
allConfirmed := false
|
|
waitCount := 0
|
|
waiting:
|
|
for !allConfirmed && waitCount < 2 {
|
|
utxolist, err := s.daemon.UTXOList(&defaultAccount, 1, 10000)
|
|
if err != nil {
|
|
return err
|
|
} else if utxolist == nil {
|
|
return errors.Err("no response")
|
|
}
|
|
|
|
for _, utxo := range utxolist.Items {
|
|
if utxo.Confirmations <= 0 {
|
|
err = s.waitForNewBlock()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
waitCount++
|
|
continue waiting
|
|
}
|
|
}
|
|
allConfirmed = true
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type abandonResponse struct {
|
|
ClaimID string
|
|
Error error
|
|
Amount float64
|
|
}
|
|
|
|
func abandonSupports(s *Sync) (float64, error) {
|
|
start := time.Now()
|
|
defer func(start time.Time) {
|
|
timing.TimedComponent("abandonSupports").Add(time.Since(start))
|
|
}(start)
|
|
totalPages := uint64(1)
|
|
var allSupports []jsonrpc.Claim
|
|
defaultAccount, err := s.getDefaultAccount()
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
for page := uint64(1); page <= totalPages; page++ {
|
|
supports, err := s.daemon.SupportList(&defaultAccount, page, 50)
|
|
if err != nil {
|
|
return 0, errors.Prefix("cannot list claims", err)
|
|
}
|
|
allSupports = append(allSupports, (*supports).Items...)
|
|
totalPages = (*supports).TotalPages
|
|
}
|
|
producerWG := &stop.Group{}
|
|
|
|
claimIDChan := make(chan string, len(allSupports))
|
|
abandonRspChan := make(chan abandonResponse, len(allSupports))
|
|
alreadyAbandoned := make(map[string]bool, len(allSupports))
|
|
producerWG.Add(1)
|
|
go func() {
|
|
defer producerWG.Done()
|
|
for _, support := range allSupports {
|
|
_, ok := alreadyAbandoned[support.ClaimID]
|
|
if ok {
|
|
continue
|
|
}
|
|
alreadyAbandoned[support.ClaimID] = true
|
|
claimIDChan <- support.ClaimID
|
|
}
|
|
}()
|
|
consumerWG := &stop.Group{}
|
|
//TODO: remove this once the SDK team fixes their RPC bugs....
|
|
s.daemon.SetRPCTimeout(60 * time.Second)
|
|
defer s.daemon.SetRPCTimeout(5 * time.Minute)
|
|
for i := 0; i < s.ConcurrentVideos; i++ {
|
|
consumerWG.Add(1)
|
|
go func() {
|
|
defer consumerWG.Done()
|
|
outer:
|
|
for {
|
|
claimID, more := <-claimIDChan
|
|
if !more {
|
|
return
|
|
} else {
|
|
summary, err := s.daemon.TxoSpend(util.PtrToString("support"), &claimID, nil, nil, nil, &defaultAccount)
|
|
if err != nil {
|
|
if strings.Contains(err.Error(), "Client.Timeout exceeded while awaiting headers") {
|
|
log.Errorf("Support abandon for %s timed out, retrying...", claimID)
|
|
summary, err = s.daemon.TxoSpend(util.PtrToString("support"), &claimID, nil, nil, nil, &defaultAccount)
|
|
if err != nil {
|
|
//TODO GUESS HOW MUCH LBC WAS RELEASED THAT WE DON'T KNOW ABOUT, because screw you SDK
|
|
abandonRspChan <- abandonResponse{
|
|
ClaimID: claimID,
|
|
Error: err,
|
|
Amount: 0, // this is likely wrong, but oh well... there is literally nothing I can do about it
|
|
}
|
|
continue
|
|
}
|
|
} else {
|
|
abandonRspChan <- abandonResponse{
|
|
ClaimID: claimID,
|
|
Error: err,
|
|
Amount: 0,
|
|
}
|
|
continue
|
|
}
|
|
}
|
|
if summary == nil || len(*summary) < 1 {
|
|
abandonRspChan <- abandonResponse{
|
|
ClaimID: claimID,
|
|
Error: errors.Err("error abandoning supports: no outputs while abandoning %s", claimID),
|
|
Amount: 0,
|
|
}
|
|
continue
|
|
}
|
|
var outputAmount float64
|
|
for _, tx := range *summary {
|
|
amount, err := strconv.ParseFloat(tx.Outputs[0].Amount, 64)
|
|
if err != nil {
|
|
abandonRspChan <- abandonResponse{
|
|
ClaimID: claimID,
|
|
Error: errors.Err(err),
|
|
Amount: 0,
|
|
}
|
|
continue outer
|
|
}
|
|
outputAmount += amount
|
|
}
|
|
if err != nil {
|
|
abandonRspChan <- abandonResponse{
|
|
ClaimID: claimID,
|
|
Error: errors.Err(err),
|
|
Amount: 0,
|
|
}
|
|
continue
|
|
}
|
|
log.Infof("Abandoned supports of %.4f LBC for claim %s", outputAmount, claimID)
|
|
abandonRspChan <- abandonResponse{
|
|
ClaimID: claimID,
|
|
Error: nil,
|
|
Amount: outputAmount,
|
|
}
|
|
continue
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
producerWG.Wait()
|
|
close(claimIDChan)
|
|
consumerWG.Wait()
|
|
close(abandonRspChan)
|
|
|
|
totalAbandoned := 0.0
|
|
for r := range abandonRspChan {
|
|
if r.Error != nil {
|
|
log.Errorf("Failed abandoning supports for %s: %s", r.ClaimID, r.Error.Error())
|
|
continue
|
|
}
|
|
totalAbandoned += r.Amount
|
|
}
|
|
return totalAbandoned, nil
|
|
}
|
|
|
|
type updateInfo struct {
|
|
ClaimID string
|
|
streamUpdateOptions *jsonrpc.StreamUpdateOptions
|
|
videoStatus *sdk.VideoStatus
|
|
}
|
|
|
|
func transferVideos(s *Sync) error {
|
|
start := time.Now()
|
|
defer func(start time.Time) {
|
|
timing.TimedComponent("transferVideos").Add(time.Since(start))
|
|
}(start)
|
|
cleanTransfer := true
|
|
|
|
streamChan := make(chan updateInfo, s.ConcurrentVideos)
|
|
account, err := s.getDefaultAccount()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
streams, err := s.daemon.StreamList(&account, 1, 30000)
|
|
if err != nil {
|
|
return errors.Err(err)
|
|
}
|
|
producerWG := &stop.Group{}
|
|
producerWG.Add(1)
|
|
go func() {
|
|
defer producerWG.Done()
|
|
for _, video := range s.syncedVideos {
|
|
if !video.Published || video.Transferred || video.MetadataVersion != LatestMetadataVersion {
|
|
continue
|
|
}
|
|
|
|
var stream *jsonrpc.Claim = nil
|
|
for _, c := range streams.Items {
|
|
if c.ClaimID != video.ClaimID || (c.SigningChannel != nil && c.SigningChannel.ClaimID != s.lbryChannelID) {
|
|
continue
|
|
}
|
|
stream = &c
|
|
break
|
|
}
|
|
if stream == nil {
|
|
return
|
|
}
|
|
|
|
streamUpdateOptions := jsonrpc.StreamUpdateOptions{
|
|
StreamCreateOptions: &jsonrpc.StreamCreateOptions{
|
|
ClaimCreateOptions: jsonrpc.ClaimCreateOptions{
|
|
ClaimAddress: &s.clientPublishAddress,
|
|
FundingAccountIDs: []string{
|
|
account,
|
|
},
|
|
},
|
|
},
|
|
Bid: util.PtrToString("0.005"), // Todo - Dont hardcode
|
|
}
|
|
videoStatus := sdk.VideoStatus{
|
|
ChannelID: s.YoutubeChannelID,
|
|
VideoID: video.VideoID,
|
|
ClaimID: video.ClaimID,
|
|
ClaimName: video.ClaimName,
|
|
Status: VideoStatusPublished,
|
|
IsTransferred: util.PtrToBool(true),
|
|
}
|
|
streamChan <- updateInfo{
|
|
ClaimID: video.ClaimID,
|
|
streamUpdateOptions: &streamUpdateOptions,
|
|
videoStatus: &videoStatus,
|
|
}
|
|
}
|
|
}()
|
|
|
|
consumerWG := &stop.Group{}
|
|
for i := 0; i < s.ConcurrentVideos; i++ {
|
|
consumerWG.Add(1)
|
|
go func(worker int) {
|
|
defer consumerWG.Done()
|
|
for {
|
|
ui, more := <-streamChan
|
|
if !more {
|
|
return
|
|
} else {
|
|
err := s.streamUpdate(&ui)
|
|
if err != nil {
|
|
cleanTransfer = false
|
|
}
|
|
}
|
|
}
|
|
}(i)
|
|
}
|
|
producerWG.Wait()
|
|
close(streamChan)
|
|
consumerWG.Wait()
|
|
|
|
if !cleanTransfer {
|
|
return errors.Err("A video has failed to transfer for the channel...skipping channel transfer")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Sync) streamUpdate(ui *updateInfo) error {
|
|
start := time.Now()
|
|
result, updateError := s.daemon.StreamUpdate(ui.ClaimID, *ui.streamUpdateOptions)
|
|
timing.TimedComponent("transferStreamUpdate").Add(time.Since(start))
|
|
if updateError != nil {
|
|
ui.videoStatus.FailureReason = updateError.Error()
|
|
ui.videoStatus.Status = VideoStatusTranferFailed
|
|
ui.videoStatus.IsTransferred = util.PtrToBool(false)
|
|
} else {
|
|
ui.videoStatus.IsTransferred = util.PtrToBool(len(result.Outputs) != 0)
|
|
}
|
|
log.Infof("TRANSFERRED %t", *ui.videoStatus.IsTransferred)
|
|
statusErr := s.APIConfig.MarkVideoStatus(*ui.videoStatus)
|
|
if statusErr != nil {
|
|
return errors.Prefix(statusErr.Error(), updateError)
|
|
}
|
|
return errors.Err(updateError)
|
|
}
|
|
|
|
func transferChannel(s *Sync) error {
|
|
start := time.Now()
|
|
defer func(start time.Time) {
|
|
timing.TimedComponent("transferChannel").Add(time.Since(start))
|
|
}(start)
|
|
account, err := s.getDefaultAccount()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
channelClaims, err := s.daemon.ChannelList(&account, 1, 50, nil)
|
|
if err != nil {
|
|
return errors.Err(err)
|
|
}
|
|
var channelClaim *jsonrpc.Transaction = nil
|
|
for _, c := range channelClaims.Items {
|
|
if c.ClaimID != s.lbryChannelID {
|
|
continue
|
|
}
|
|
channelClaim = &c
|
|
break
|
|
}
|
|
if channelClaim == nil {
|
|
return nil
|
|
}
|
|
|
|
updateOptions := jsonrpc.ChannelUpdateOptions{
|
|
Bid: util.PtrToString(fmt.Sprintf("%.6f", channelClaimAmount-0.005)),
|
|
ChannelCreateOptions: jsonrpc.ChannelCreateOptions{
|
|
ClaimCreateOptions: jsonrpc.ClaimCreateOptions{
|
|
ClaimAddress: &s.clientPublishAddress,
|
|
},
|
|
},
|
|
}
|
|
result, err := s.daemon.ChannelUpdate(s.lbryChannelID, updateOptions)
|
|
if err != nil {
|
|
return errors.Err(err)
|
|
}
|
|
log.Infof("TRANSFERRED %t", len(result.Outputs) != 0)
|
|
|
|
return nil
|
|
}
|