Compare commits
1 commit
Author | SHA1 | Date | |
---|---|---|---|
|
8b32381b23 |
4 changed files with 65 additions and 66 deletions
3
go.mod
3
go.mod
|
@ -13,7 +13,6 @@ require (
|
||||||
github.com/docker/go-units v0.4.0 // indirect
|
github.com/docker/go-units v0.4.0 // indirect
|
||||||
github.com/go-sql-driver/mysql v1.4.1 // indirect
|
github.com/go-sql-driver/mysql v1.4.1 // indirect
|
||||||
github.com/golang/groupcache v0.0.0-20191002201903-404acd9df4cc // indirect
|
github.com/golang/groupcache v0.0.0-20191002201903-404acd9df4cc // indirect
|
||||||
github.com/google/btree v1.0.0 // indirect
|
|
||||||
github.com/hashicorp/go-immutable-radix v1.1.0 // indirect
|
github.com/hashicorp/go-immutable-radix v1.1.0 // indirect
|
||||||
github.com/hashicorp/go-sockaddr v1.0.2 // indirect
|
github.com/hashicorp/go-sockaddr v1.0.2 // indirect
|
||||||
github.com/hashicorp/golang-lru v0.5.3 // indirect
|
github.com/hashicorp/golang-lru v0.5.3 // indirect
|
||||||
|
@ -34,3 +33,5 @@ require (
|
||||||
google.golang.org/api v0.11.0
|
google.golang.org/api v0.11.0
|
||||||
google.golang.org/appengine v1.6.5 // indirect
|
google.golang.org/appengine v1.6.5 // indirect
|
||||||
)
|
)
|
||||||
|
|
||||||
|
go 1.13
|
||||||
|
|
|
@ -6,6 +6,8 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/uber-go/atomic"
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||||
"github.com/lbryio/lbry.go/v2/extras/jsonrpc"
|
"github.com/lbryio/lbry.go/v2/extras/jsonrpc"
|
||||||
"github.com/lbryio/lbry.go/v2/extras/stop"
|
"github.com/lbryio/lbry.go/v2/extras/stop"
|
||||||
|
@ -45,13 +47,13 @@ waiting:
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type abandonResponse struct {
|
|
||||||
ClaimID string
|
|
||||||
Error error
|
|
||||||
Amount float64
|
|
||||||
}
|
|
||||||
|
|
||||||
func abandonSupports(s *Sync) (float64, error) {
|
func abandonSupports(s *Sync) (float64, error) {
|
||||||
|
type abandonResponse struct {
|
||||||
|
ClaimID string
|
||||||
|
Error error
|
||||||
|
Amount float64
|
||||||
|
}
|
||||||
|
|
||||||
totalPages := uint64(1)
|
totalPages := uint64(1)
|
||||||
var allSupports []jsonrpc.Claim
|
var allSupports []jsonrpc.Claim
|
||||||
defaultAccount, err := s.getDefaultAccount()
|
defaultAccount, err := s.getDefaultAccount()
|
||||||
|
@ -66,23 +68,25 @@ func abandonSupports(s *Sync) (float64, error) {
|
||||||
allSupports = append(allSupports, (*supports).Items...)
|
allSupports = append(allSupports, (*supports).Items...)
|
||||||
totalPages = (*supports).TotalPages
|
totalPages = (*supports).TotalPages
|
||||||
}
|
}
|
||||||
producerWG := &stop.Group{}
|
|
||||||
|
|
||||||
claimIDChan := make(chan string, len(allSupports))
|
claimIDChan := make(chan string)
|
||||||
abandonRspChan := make(chan abandonResponse, len(allSupports))
|
abandonRspChan := make(chan abandonResponse)
|
||||||
|
collectorChan := make(chan bool)
|
||||||
|
|
||||||
alreadyAbandoned := make(map[string]bool, len(allSupports))
|
alreadyAbandoned := make(map[string]bool, len(allSupports))
|
||||||
producerWG.Add(1)
|
totalAbandoned := atomic.NewFloat64(0)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer producerWG.Done()
|
for r := range abandonRspChan {
|
||||||
for _, support := range allSupports {
|
if r.Error != nil {
|
||||||
_, ok := alreadyAbandoned[support.ClaimID]
|
log.Errorf("Failed abandoning supports for %s: %s", r.ClaimID, r.Error.Error())
|
||||||
if ok {
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
alreadyAbandoned[support.ClaimID] = true
|
totalAbandoned.Add(r.Amount)
|
||||||
claimIDChan <- support.ClaimID
|
|
||||||
}
|
}
|
||||||
|
close(collectorChan)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
consumerWG := &stop.Group{}
|
consumerWG := &stop.Group{}
|
||||||
//TODO: remove this once the SDK team fixes their RPC bugs....
|
//TODO: remove this once the SDK team fixes their RPC bugs....
|
||||||
s.daemon.SetRPCTimeout(30 * time.Second)
|
s.daemon.SetRPCTimeout(30 * time.Second)
|
||||||
|
@ -95,72 +99,64 @@ func abandonSupports(s *Sync) (float64, error) {
|
||||||
claimID, more := <-claimIDChan
|
claimID, more := <-claimIDChan
|
||||||
if !more {
|
if !more {
|
||||||
return
|
return
|
||||||
} else {
|
}
|
||||||
summary, err := s.daemon.SupportAbandon(&claimID, nil, nil, nil, nil)
|
|
||||||
if err != nil {
|
summary, err := s.daemon.SupportAbandon(&claimID, nil, nil, nil, nil)
|
||||||
if strings.Contains(err.Error(), "Client.Timeout exceeded while awaiting headers") {
|
if err != nil {
|
||||||
log.Errorf("Support abandon for %s timed out, retrying...", claimID)
|
if strings.Contains(err.Error(), "Client.Timeout exceeded while awaiting headers") {
|
||||||
summary, err = s.daemon.SupportAbandon(&claimID, nil, nil, nil, nil)
|
log.Errorf("Support abandon for %s timed out, retrying...", claimID)
|
||||||
if err != nil {
|
summary, err = s.daemon.SupportAbandon(&claimID, nil, nil, nil, nil)
|
||||||
//TODO GUESS HOW MUCH LBC WAS RELEASED THAT WE DON'T KNOW ABOUT, because screw you SDK
|
if err != nil {
|
||||||
abandonRspChan <- abandonResponse{
|
//TODO GUESS HOW MUCH LBC WAS RELEASED THAT WE DON'T KNOW ABOUT, because screw you SDK
|
||||||
ClaimID: claimID,
|
|
||||||
Error: err,
|
|
||||||
Amount: 0, // this is likely wrong, but oh well... there is literally nothing I can do about it
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
abandonRspChan <- abandonResponse{
|
abandonRspChan <- abandonResponse{
|
||||||
ClaimID: claimID,
|
ClaimID: claimID,
|
||||||
Error: err,
|
Error: err,
|
||||||
Amount: 0,
|
Amount: 0, // this is likely wrong, but oh well... there is literally nothing I can do about it
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
} else {
|
||||||
if len(summary.Outputs) < 1 {
|
abandonRspChan <- abandonResponse{ClaimID: claimID, Error: err}
|
||||||
abandonRspChan <- abandonResponse{
|
|
||||||
ClaimID: claimID,
|
|
||||||
Error: errors.Err("error abandoning supports: no outputs while abandoning %s", claimID),
|
|
||||||
Amount: 0,
|
|
||||||
}
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
outputAmount, err := strconv.ParseFloat(summary.Outputs[0].Amount, 64)
|
}
|
||||||
if err != nil {
|
|
||||||
abandonRspChan <- abandonResponse{
|
if len(summary.Outputs) < 1 {
|
||||||
ClaimID: claimID,
|
|
||||||
Error: errors.Err(err),
|
|
||||||
Amount: 0,
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
log.Infof("Abandoned supports of %.4f LBC for claim %s", outputAmount, claimID)
|
|
||||||
abandonRspChan <- abandonResponse{
|
abandonRspChan <- abandonResponse{
|
||||||
ClaimID: claimID,
|
ClaimID: claimID,
|
||||||
Error: nil,
|
Error: errors.Err("abandoning supports: no outputs for %s", claimID),
|
||||||
Amount: outputAmount,
|
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
outputAmount, err := strconv.ParseFloat(summary.Outputs[0].Amount, 64)
|
||||||
|
if err != nil {
|
||||||
|
abandonRspChan <- abandonResponse{ClaimID: claimID, Error: errors.Err(err)}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("Abandoned supports of %.4f LBC for claim %s", outputAmount, claimID)
|
||||||
|
abandonRspChan <- abandonResponse{ClaimID: claimID, Amount: outputAmount}
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
producerWG.Wait()
|
|
||||||
close(claimIDChan)
|
|
||||||
consumerWG.Wait()
|
|
||||||
close(abandonRspChan)
|
|
||||||
|
|
||||||
totalAbandoned := 0.0
|
for _, support := range allSupports {
|
||||||
for r := range abandonRspChan {
|
_, ok := alreadyAbandoned[support.ClaimID]
|
||||||
if r.Error != nil {
|
if ok {
|
||||||
log.Errorf("Failed abandoning supports for %s: %s", r.ClaimID, r.Error.Error())
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
totalAbandoned += r.Amount
|
alreadyAbandoned[support.ClaimID] = true
|
||||||
|
claimIDChan <- support.ClaimID
|
||||||
}
|
}
|
||||||
return totalAbandoned, nil
|
close(claimIDChan)
|
||||||
|
|
||||||
|
consumerWG.Wait()
|
||||||
|
close(abandonRspChan)
|
||||||
|
<-collectorChan
|
||||||
|
|
||||||
|
return totalAbandoned.Load(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type updateInfo struct {
|
type updateInfo struct {
|
||||||
|
|
|
@ -824,9 +824,9 @@ func (s *Sync) startWorker(workerNum int) {
|
||||||
err := s.processVideo(v)
|
err := s.processVideo(v)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logMsg := fmt.Sprintf("error processing video %s: %s", v.ID(), err.Error())
|
logMsg := fmt.Sprintf("error processing video %s: %s", v.ID(), errors.FullTrace(err))
|
||||||
log.Errorln(logMsg)
|
log.Errorln(logMsg)
|
||||||
if strings.Contains(strings.ToLower(err.Error()), "interrupted by user") {
|
if errors.Is(err, sources.ErrInterruptedByUser) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
fatalErrors := []string{
|
fatalErrors := []string{
|
||||||
|
|
|
@ -31,6 +31,8 @@ import (
|
||||||
"google.golang.org/api/youtube/v3"
|
"google.golang.org/api/youtube/v3"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var ErrInterruptedByUser = errors.Base("interrupted by user")
|
||||||
|
|
||||||
type YoutubeVideo struct {
|
type YoutubeVideo struct {
|
||||||
id string
|
id string
|
||||||
title string
|
title string
|
||||||
|
@ -226,7 +228,7 @@ func (v *YoutubeVideo) download(useIPv6 bool) error {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-v.stopGroup.Ch():
|
case <-v.stopGroup.Ch():
|
||||||
return errors.Err("interrupted by user")
|
return errors.Err(ErrInterruptedByUser)
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue