Compare commits

...

1 commit
master ... grin

Author SHA1 Message Date
Alex Grintsvayg
8b32381b23
grin 2019-10-16 15:00:11 -04:00
4 changed files with 65 additions and 66 deletions

3
go.mod
View file

@ -13,7 +13,6 @@ require (
github.com/docker/go-units v0.4.0 // indirect github.com/docker/go-units v0.4.0 // indirect
github.com/go-sql-driver/mysql v1.4.1 // indirect github.com/go-sql-driver/mysql v1.4.1 // indirect
github.com/golang/groupcache v0.0.0-20191002201903-404acd9df4cc // indirect github.com/golang/groupcache v0.0.0-20191002201903-404acd9df4cc // indirect
github.com/google/btree v1.0.0 // indirect
github.com/hashicorp/go-immutable-radix v1.1.0 // indirect github.com/hashicorp/go-immutable-radix v1.1.0 // indirect
github.com/hashicorp/go-sockaddr v1.0.2 // indirect github.com/hashicorp/go-sockaddr v1.0.2 // indirect
github.com/hashicorp/golang-lru v0.5.3 // indirect github.com/hashicorp/golang-lru v0.5.3 // indirect
@ -34,3 +33,5 @@ require (
google.golang.org/api v0.11.0 google.golang.org/api v0.11.0
google.golang.org/appengine v1.6.5 // indirect google.golang.org/appengine v1.6.5 // indirect
) )
go 1.13

View file

@ -6,6 +6,8 @@ import (
"strings" "strings"
"time" "time"
"github.com/uber-go/atomic"
"github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/jsonrpc" "github.com/lbryio/lbry.go/v2/extras/jsonrpc"
"github.com/lbryio/lbry.go/v2/extras/stop" "github.com/lbryio/lbry.go/v2/extras/stop"
@ -45,13 +47,13 @@ waiting:
return nil return nil
} }
type abandonResponse struct {
ClaimID string
Error error
Amount float64
}
func abandonSupports(s *Sync) (float64, error) { func abandonSupports(s *Sync) (float64, error) {
type abandonResponse struct {
ClaimID string
Error error
Amount float64
}
totalPages := uint64(1) totalPages := uint64(1)
var allSupports []jsonrpc.Claim var allSupports []jsonrpc.Claim
defaultAccount, err := s.getDefaultAccount() defaultAccount, err := s.getDefaultAccount()
@ -66,23 +68,25 @@ func abandonSupports(s *Sync) (float64, error) {
allSupports = append(allSupports, (*supports).Items...) allSupports = append(allSupports, (*supports).Items...)
totalPages = (*supports).TotalPages totalPages = (*supports).TotalPages
} }
producerWG := &stop.Group{}
claimIDChan := make(chan string, len(allSupports)) claimIDChan := make(chan string)
abandonRspChan := make(chan abandonResponse, len(allSupports)) abandonRspChan := make(chan abandonResponse)
collectorChan := make(chan bool)
alreadyAbandoned := make(map[string]bool, len(allSupports)) alreadyAbandoned := make(map[string]bool, len(allSupports))
producerWG.Add(1) totalAbandoned := atomic.NewFloat64(0)
go func() { go func() {
defer producerWG.Done() for r := range abandonRspChan {
for _, support := range allSupports { if r.Error != nil {
_, ok := alreadyAbandoned[support.ClaimID] log.Errorf("Failed abandoning supports for %s: %s", r.ClaimID, r.Error.Error())
if ok {
continue continue
} }
alreadyAbandoned[support.ClaimID] = true totalAbandoned.Add(r.Amount)
claimIDChan <- support.ClaimID
} }
close(collectorChan)
}() }()
consumerWG := &stop.Group{} consumerWG := &stop.Group{}
//TODO: remove this once the SDK team fixes their RPC bugs.... //TODO: remove this once the SDK team fixes their RPC bugs....
s.daemon.SetRPCTimeout(30 * time.Second) s.daemon.SetRPCTimeout(30 * time.Second)
@ -95,72 +99,64 @@ func abandonSupports(s *Sync) (float64, error) {
claimID, more := <-claimIDChan claimID, more := <-claimIDChan
if !more { if !more {
return return
} else { }
summary, err := s.daemon.SupportAbandon(&claimID, nil, nil, nil, nil)
if err != nil { summary, err := s.daemon.SupportAbandon(&claimID, nil, nil, nil, nil)
if strings.Contains(err.Error(), "Client.Timeout exceeded while awaiting headers") { if err != nil {
log.Errorf("Support abandon for %s timed out, retrying...", claimID) if strings.Contains(err.Error(), "Client.Timeout exceeded while awaiting headers") {
summary, err = s.daemon.SupportAbandon(&claimID, nil, nil, nil, nil) log.Errorf("Support abandon for %s timed out, retrying...", claimID)
if err != nil { summary, err = s.daemon.SupportAbandon(&claimID, nil, nil, nil, nil)
//TODO GUESS HOW MUCH LBC WAS RELEASED THAT WE DON'T KNOW ABOUT, because screw you SDK if err != nil {
abandonRspChan <- abandonResponse{ //TODO GUESS HOW MUCH LBC WAS RELEASED THAT WE DON'T KNOW ABOUT, because screw you SDK
ClaimID: claimID,
Error: err,
Amount: 0, // this is likely wrong, but oh well... there is literally nothing I can do about it
}
continue
}
} else {
abandonRspChan <- abandonResponse{ abandonRspChan <- abandonResponse{
ClaimID: claimID, ClaimID: claimID,
Error: err, Error: err,
Amount: 0, Amount: 0, // this is likely wrong, but oh well... there is literally nothing I can do about it
} }
continue continue
} }
} } else {
if len(summary.Outputs) < 1 { abandonRspChan <- abandonResponse{ClaimID: claimID, Error: err}
abandonRspChan <- abandonResponse{
ClaimID: claimID,
Error: errors.Err("error abandoning supports: no outputs while abandoning %s", claimID),
Amount: 0,
}
continue continue
} }
outputAmount, err := strconv.ParseFloat(summary.Outputs[0].Amount, 64) }
if err != nil {
abandonRspChan <- abandonResponse{ if len(summary.Outputs) < 1 {
ClaimID: claimID,
Error: errors.Err(err),
Amount: 0,
}
continue
}
log.Infof("Abandoned supports of %.4f LBC for claim %s", outputAmount, claimID)
abandonRspChan <- abandonResponse{ abandonRspChan <- abandonResponse{
ClaimID: claimID, ClaimID: claimID,
Error: nil, Error: errors.Err("abandoning supports: no outputs for %s", claimID),
Amount: outputAmount,
} }
continue continue
} }
outputAmount, err := strconv.ParseFloat(summary.Outputs[0].Amount, 64)
if err != nil {
abandonRspChan <- abandonResponse{ClaimID: claimID, Error: errors.Err(err)}
continue
}
log.Infof("Abandoned supports of %.4f LBC for claim %s", outputAmount, claimID)
abandonRspChan <- abandonResponse{ClaimID: claimID, Amount: outputAmount}
continue
} }
}() }()
} }
producerWG.Wait()
close(claimIDChan)
consumerWG.Wait()
close(abandonRspChan)
totalAbandoned := 0.0 for _, support := range allSupports {
for r := range abandonRspChan { _, ok := alreadyAbandoned[support.ClaimID]
if r.Error != nil { if ok {
log.Errorf("Failed abandoning supports for %s: %s", r.ClaimID, r.Error.Error())
continue continue
} }
totalAbandoned += r.Amount alreadyAbandoned[support.ClaimID] = true
claimIDChan <- support.ClaimID
} }
return totalAbandoned, nil close(claimIDChan)
consumerWG.Wait()
close(abandonRspChan)
<-collectorChan
return totalAbandoned.Load(), nil
} }
type updateInfo struct { type updateInfo struct {

View file

@ -824,9 +824,9 @@ func (s *Sync) startWorker(workerNum int) {
err := s.processVideo(v) err := s.processVideo(v)
if err != nil { if err != nil {
logMsg := fmt.Sprintf("error processing video %s: %s", v.ID(), err.Error()) logMsg := fmt.Sprintf("error processing video %s: %s", v.ID(), errors.FullTrace(err))
log.Errorln(logMsg) log.Errorln(logMsg)
if strings.Contains(strings.ToLower(err.Error()), "interrupted by user") { if errors.Is(err, sources.ErrInterruptedByUser) {
return return
} }
fatalErrors := []string{ fatalErrors := []string{

View file

@ -31,6 +31,8 @@ import (
"google.golang.org/api/youtube/v3" "google.golang.org/api/youtube/v3"
) )
var ErrInterruptedByUser = errors.Base("interrupted by user")
type YoutubeVideo struct { type YoutubeVideo struct {
id string id string
title string title string
@ -226,7 +228,7 @@ func (v *YoutubeVideo) download(useIPv6 bool) error {
for { for {
select { select {
case <-v.stopGroup.Ch(): case <-v.stopGroup.Ch():
return errors.Err("interrupted by user") return errors.Err(ErrInterruptedByUser)
default: default:
} }