This commit is contained in:
Niko Storni 2020-11-03 02:14:01 +01:00
parent ca8ff505d4
commit beade71aa6
4 changed files with 29 additions and 9 deletions

View file

@ -55,6 +55,7 @@ func (s *SyncManager) Start() error {
} }
var lastChannelProcessed string var lastChannelProcessed string
var secondLastChannelProcessed string
syncCount := 0 syncCount := 0
for { for {
s.channelsToSync = make([]Sync, 0, 10) // reset sync queue s.channelsToSync = make([]Sync, 0, 10) // reset sync queue
@ -108,12 +109,13 @@ func (s *SyncManager) Start() error {
time.Sleep(5 * time.Minute) time.Sleep(5 * time.Minute)
} }
for _, sync := range s.channelsToSync { for _, sync := range s.channelsToSync {
if lastChannelProcessed == sync.DbChannelData.ChannelId { if lastChannelProcessed == sync.DbChannelData.ChannelId && secondLastChannelProcessed == lastChannelProcessed {
util.SendToSlack("We just killed a sync for %s to stop looping! (%s)", sync.DbChannelData.DesiredChannelName, sync.DbChannelData.ChannelId) util.SendToSlack("We just killed a sync for %s to stop looping! (%s)", sync.DbChannelData.DesiredChannelName, sync.DbChannelData.ChannelId)
stopTheLoops := errors.Err("Found channel %s running twice, set it to failed, and reprocess later", sync.DbChannelData.DesiredChannelName) stopTheLoops := errors.Err("Found channel %s running 3 times, set it to failed, and reprocess later", sync.DbChannelData.DesiredChannelName)
sync.setChannelTerminationStatus(&stopTheLoops) sync.setChannelTerminationStatus(&stopTheLoops)
continue continue
} }
secondLastChannelProcessed = lastChannelProcessed
lastChannelProcessed = sync.DbChannelData.ChannelId lastChannelProcessed = sync.DbChannelData.ChannelId
shouldNotCount := false shouldNotCount := false
logUtils.SendInfoToSlack("Syncing %s (%s) to LBRY! total processed channels since startup: %d", sync.DbChannelData.DesiredChannelName, sync.DbChannelData.ChannelId, syncCount+1) logUtils.SendInfoToSlack("Syncing %s (%s) to LBRY! total processed channels since startup: %d", sync.DbChannelData.DesiredChannelName, sync.DbChannelData.ChannelId, syncCount+1)

View file

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"math" "math"
"strconv" "strconv"
"strings"
"time" "time"
"github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/errors"
@ -381,9 +382,18 @@ func (s *Sync) ensureChannelOwnership() error {
} }
channelInfo, err := ytapi.ChannelInfo(s.DbChannelData.ChannelId) channelInfo, err := ytapi.ChannelInfo(s.DbChannelData.ChannelId)
if err != nil {
if strings.Contains(err.Error(), "invalid character 'e' looking for beginning of value") {
logUtils.SendInfoToSlack("failed to get channel data for %s. Waiting 1 minute to retry", s.DbChannelData.ChannelId)
time.Sleep(1 * time.Minute)
channelInfo, err = ytapi.ChannelInfo(s.DbChannelData.ChannelId)
if err != nil { if err != nil {
return err return err
} }
} else {
return err
}
}
thumbnail := channelInfo.Header.C4TabbedHeaderRenderer.Avatar.Thumbnails[len(channelInfo.Header.C4TabbedHeaderRenderer.Avatar.Thumbnails)-1].URL thumbnail := channelInfo.Header.C4TabbedHeaderRenderer.Avatar.Thumbnails[len(channelInfo.Header.C4TabbedHeaderRenderer.Avatar.Thumbnails)-1].URL
thumbnailURL, err := thumbs.MirrorThumbnail(thumbnail, s.DbChannelData.ChannelId, *s.Manager.AwsConfigs.GetS3AWSConfig()) thumbnailURL, err := thumbs.MirrorThumbnail(thumbnail, s.DbChannelData.ChannelId, *s.Manager.AwsConfigs.GetS3AWSConfig())

View file

@ -262,11 +262,18 @@ func (s *Sync) setChannelTerminationStatus(e *error) {
"interrupted by user", "interrupted by user",
"use --skip-space-check to ignore", "use --skip-space-check to ignore",
} }
dbWipeConditions := []string{
"Missing inputs",
}
if util.SubstringInSlice((*e).Error(), noFailConditions) { if util.SubstringInSlice((*e).Error(), noFailConditions) {
return return
} }
channelStatus := shared.StatusFailed
if util.SubstringInSlice((*e).Error(), dbWipeConditions) {
channelStatus = shared.StatusWipeDb
}
failureReason := (*e).Error() failureReason := (*e).Error()
_, _, err := s.Manager.ApiConfig.SetChannelStatus(s.DbChannelData.ChannelId, shared.StatusFailed, failureReason, transferState) _, _, err := s.Manager.ApiConfig.SetChannelStatus(s.DbChannelData.ChannelId, channelStatus, failureReason, transferState)
if err != nil { if err != nil {
msg := fmt.Sprintf("Failed setting failed state for channel %s", s.DbChannelData.DesiredChannelName) msg := fmt.Sprintf("Failed setting failed state for channel %s", s.DbChannelData.DesiredChannelName)
*e = errors.Prefix(msg+err.Error(), *e) *e = errors.Prefix(msg+err.Error(), *e)
@ -395,15 +402,14 @@ func (s *Sync) fixDupes(claims []jsonrpc.Claim) (bool, error) {
} }
if claimToAbandon.Address != s.DbChannelData.PublishAddress && !s.syncedVideos[videoID].Transferred { if claimToAbandon.Address != s.DbChannelData.PublishAddress && !s.syncedVideos[videoID].Transferred {
log.Debugf("abandoning %+v", claimToAbandon) log.Debugf("abandoning %+v", claimToAbandon)
_, err := s.daemon.StreamAbandon(claimToAbandon.Txid, claimToAbandon.Nout, nil, false) _, err := s.daemon.StreamAbandon(claimToAbandon.Txid, claimToAbandon.Nout, nil, true)
if err != nil { if err != nil {
return true, err return true, err
} }
abandonedClaims = true
} else { } else {
log.Debugf("lbrynet stream abandon --txid=%s --nout=%d", claimToAbandon.Txid, claimToAbandon.Nout) log.Debugf("lbrynet stream abandon --txid=%s --nout=%d", claimToAbandon.Txid, claimToAbandon.Nout)
} }
abandonedClaims = true
//return true, nil
} }
return abandonedClaims, nil return abandonedClaims, nil
} }
@ -757,6 +763,7 @@ func (s *Sync) startWorker(workerNum int) {
"more than 90% of the space has been used.", "more than 90% of the space has been used.",
"Couldn't find private key for id", "Couldn't find private key for id",
"You already have a stream claim published under the name", "You already have a stream claim published under the name",
"Missing inputs",
} }
if util.SubstringInSlice(err.Error(), fatalErrors) || s.Manager.CliFlags.StopOnError { if util.SubstringInSlice(err.Error(), fatalErrors) || s.Manager.CliFlags.StopOnError {
s.grp.Stop() s.grp.Stop()
@ -806,7 +813,7 @@ func (s *Sync) startWorker(workerNum int) {
"Not enough funds to cover this transaction", "Not enough funds to cover this transaction",
"failed: Not enough funds", "failed: Not enough funds",
"Error in daemon: Insufficient funds, please deposit additional LBC", "Error in daemon: Insufficient funds, please deposit additional LBC",
"Missing inputs", //"Missing inputs",
}) { }) {
log.Println("checking funds and UTXOs before retrying...") log.Println("checking funds and UTXOs before retrying...")
err := s.walletSetup() err := s.walletSetup()

View file

@ -7,6 +7,7 @@ import (
) )
func TestChannelInfo(t *testing.T) { func TestChannelInfo(t *testing.T) {
_, _, err := ChannelInfo("", "UCNQfQvFMPnInwsU_iGYArJQ") info, err := ChannelInfo("UCNQfQvFMPnInwsU_iGYArJQ")
assert.NoError(t, err) assert.NoError(t, err)
assert.NotNil(t, info)
} }