diff --git a/go.mod b/go.mod index 9a4c6fc..05f7fca 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,6 @@ require ( github.com/docker/go-units v0.4.0 // indirect github.com/go-sql-driver/mysql v1.4.1 // indirect github.com/golang/groupcache v0.0.0-20191002201903-404acd9df4cc // indirect - github.com/google/btree v1.0.0 // indirect github.com/hashicorp/go-immutable-radix v1.1.0 // indirect github.com/hashicorp/go-sockaddr v1.0.2 // indirect github.com/hashicorp/golang-lru v0.5.3 // indirect @@ -34,3 +33,5 @@ require ( google.golang.org/api v0.11.0 google.golang.org/appengine v1.6.5 // indirect ) + +go 1.13 diff --git a/manager/transfer.go b/manager/transfer.go index 485b9a9..ff86d60 100644 --- a/manager/transfer.go +++ b/manager/transfer.go @@ -6,6 +6,8 @@ import ( "strings" "time" + "github.com/uber-go/atomic" + "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/jsonrpc" "github.com/lbryio/lbry.go/v2/extras/stop" @@ -45,13 +47,13 @@ waiting: return nil } -type abandonResponse struct { - ClaimID string - Error error - Amount float64 -} - func abandonSupports(s *Sync) (float64, error) { + type abandonResponse struct { + ClaimID string + Error error + Amount float64 + } + totalPages := uint64(1) var allSupports []jsonrpc.Claim defaultAccount, err := s.getDefaultAccount() @@ -66,23 +68,25 @@ func abandonSupports(s *Sync) (float64, error) { allSupports = append(allSupports, (*supports).Items...) totalPages = (*supports).TotalPages } - producerWG := &stop.Group{} - claimIDChan := make(chan string, len(allSupports)) - abandonRspChan := make(chan abandonResponse, len(allSupports)) + claimIDChan := make(chan string) + abandonRspChan := make(chan abandonResponse) + collectorChan := make(chan bool) + alreadyAbandoned := make(map[string]bool, len(allSupports)) - producerWG.Add(1) + totalAbandoned := atomic.NewFloat64(0) + go func() { - defer producerWG.Done() - for _, support := range allSupports { - _, ok := alreadyAbandoned[support.ClaimID] - if ok { + for r := range abandonRspChan { + if r.Error != nil { + log.Errorf("Failed abandoning supports for %s: %s", r.ClaimID, r.Error.Error()) continue } - alreadyAbandoned[support.ClaimID] = true - claimIDChan <- support.ClaimID + totalAbandoned.Add(r.Amount) } + close(collectorChan) }() + consumerWG := &stop.Group{} //TODO: remove this once the SDK team fixes their RPC bugs.... s.daemon.SetRPCTimeout(30 * time.Second) @@ -95,72 +99,64 @@ func abandonSupports(s *Sync) (float64, error) { claimID, more := <-claimIDChan if !more { return - } else { - summary, err := s.daemon.SupportAbandon(&claimID, nil, nil, nil, nil) - if err != nil { - if strings.Contains(err.Error(), "Client.Timeout exceeded while awaiting headers") { - log.Errorf("Support abandon for %s timed out, retrying...", claimID) - summary, err = s.daemon.SupportAbandon(&claimID, nil, nil, nil, nil) - if err != nil { - //TODO GUESS HOW MUCH LBC WAS RELEASED THAT WE DON'T KNOW ABOUT, because screw you SDK - abandonRspChan <- abandonResponse{ - ClaimID: claimID, - Error: err, - Amount: 0, // this is likely wrong, but oh well... there is literally nothing I can do about it - } - continue - } - } else { + } + + summary, err := s.daemon.SupportAbandon(&claimID, nil, nil, nil, nil) + if err != nil { + if strings.Contains(err.Error(), "Client.Timeout exceeded while awaiting headers") { + log.Errorf("Support abandon for %s timed out, retrying...", claimID) + summary, err = s.daemon.SupportAbandon(&claimID, nil, nil, nil, nil) + if err != nil { + //TODO GUESS HOW MUCH LBC WAS RELEASED THAT WE DON'T KNOW ABOUT, because screw you SDK abandonRspChan <- abandonResponse{ ClaimID: claimID, Error: err, - Amount: 0, + Amount: 0, // this is likely wrong, but oh well... there is literally nothing I can do about it } continue } - } - if len(summary.Outputs) < 1 { - abandonRspChan <- abandonResponse{ - ClaimID: claimID, - Error: errors.Err("error abandoning supports: no outputs while abandoning %s", claimID), - Amount: 0, - } + } else { + abandonRspChan <- abandonResponse{ClaimID: claimID, Error: err} continue } - outputAmount, err := strconv.ParseFloat(summary.Outputs[0].Amount, 64) - if err != nil { - abandonRspChan <- abandonResponse{ - ClaimID: claimID, - Error: errors.Err(err), - Amount: 0, - } - continue - } - log.Infof("Abandoned supports of %.4f LBC for claim %s", outputAmount, claimID) + } + + if len(summary.Outputs) < 1 { abandonRspChan <- abandonResponse{ ClaimID: claimID, - Error: nil, - Amount: outputAmount, + Error: errors.Err("abandoning supports: no outputs for %s", claimID), } continue } + + outputAmount, err := strconv.ParseFloat(summary.Outputs[0].Amount, 64) + if err != nil { + abandonRspChan <- abandonResponse{ClaimID: claimID, Error: errors.Err(err)} + continue + } + + log.Infof("Abandoned supports of %.4f LBC for claim %s", outputAmount, claimID) + abandonRspChan <- abandonResponse{ClaimID: claimID, Amount: outputAmount} + continue } }() } - producerWG.Wait() - close(claimIDChan) - consumerWG.Wait() - close(abandonRspChan) - totalAbandoned := 0.0 - for r := range abandonRspChan { - if r.Error != nil { - log.Errorf("Failed abandoning supports for %s: %s", r.ClaimID, r.Error.Error()) + for _, support := range allSupports { + _, ok := alreadyAbandoned[support.ClaimID] + if ok { continue } - totalAbandoned += r.Amount + alreadyAbandoned[support.ClaimID] = true + claimIDChan <- support.ClaimID } - return totalAbandoned, nil + close(claimIDChan) + + consumerWG.Wait() + close(abandonRspChan) + <-collectorChan + + return totalAbandoned.Load(), nil } type updateInfo struct { diff --git a/manager/ytsync.go b/manager/ytsync.go index 314dec1..15a53d6 100644 --- a/manager/ytsync.go +++ b/manager/ytsync.go @@ -824,9 +824,9 @@ func (s *Sync) startWorker(workerNum int) { err := s.processVideo(v) if err != nil { - logMsg := fmt.Sprintf("error processing video %s: %s", v.ID(), err.Error()) + logMsg := fmt.Sprintf("error processing video %s: %s", v.ID(), errors.FullTrace(err)) log.Errorln(logMsg) - if strings.Contains(strings.ToLower(err.Error()), "interrupted by user") { + if errors.Is(err, sources.ErrInterruptedByUser) { return } fatalErrors := []string{ diff --git a/sources/youtubeVideo.go b/sources/youtubeVideo.go index 99192fa..e9d16e2 100644 --- a/sources/youtubeVideo.go +++ b/sources/youtubeVideo.go @@ -31,6 +31,8 @@ import ( "google.golang.org/api/youtube/v3" ) +var ErrInterruptedByUser = errors.Base("interrupted by user") + type YoutubeVideo struct { id string title string @@ -226,7 +228,7 @@ func (v *YoutubeVideo) download(useIPv6 bool) error { for { select { case <-v.stopGroup.Ch(): - return errors.Err("interrupted by user") + return errors.Err(ErrInterruptedByUser) default: }