parallelize support abandons

update lbrynet support in e2e test
stop using reflector on regtest!
change e2e channel target for more videos
possibly fix a deadlock
This commit is contained in:
Niko Storni 2019-10-11 19:41:55 +02:00
parent dd4aebdba4
commit a9e76149e8
5 changed files with 108 additions and 27 deletions

View file

@ -6,6 +6,7 @@ lbryum_servers:
- walletserver:50001 - walletserver:50001
save_blobs: true save_blobs: true
save_files: false save_files: false
reflect_streams: false #for the love of god, don't upload regtest streams to reflector!
share_usage_data: false share_usage_data: false
tcp_port: 3333 tcp_port: 3333
udp_port: 4444 udp_port: 4444

View file

@ -21,7 +21,7 @@ services:
## Wallet Server ## ## Wallet Server ##
################### ###################
walletserver: walletserver:
image: lbry/wallet-server:v0.42.0 image: lbry/wallet-server:v0.42.2
restart: always restart: always
environment: environment:
- DB_DIRECTORY=/database - DB_DIRECTORY=/database
@ -47,7 +47,7 @@ services:
## Lbrynet ## ## Lbrynet ##
############# #############
lbrynet: lbrynet:
image: lbry/lbrynet:v0.42.0 image: lbry/lbrynet:v0.42.2
restart: always restart: always
ports: ports:
- "15100:5279" - "15100:5279"

View file

@ -53,7 +53,7 @@ echo "successfully started..."
./data_setup.sh ./data_setup.sh
# Execute the sync test! # Execute the sync test!
./../bin/ytsync --channelID UCNQfQvFMPnInwsU_iGYArJQ --videos-limit 1 --concurrent-jobs 4 #Force channel intended...just in case. This channel lines up with the api container ./../bin/ytsync --channelID UCNQfQvFMPnInwsU_iGYArJQ --videos-limit 2 --concurrent-jobs 4 #Force channel intended...just in case. This channel lines up with the api container
status=$(mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e 'SELECT status FROM youtube_data WHERE id=1') status=$(mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e 'SELECT status FROM youtube_data WHERE id=1')
videoStatus=$(mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e 'SELECT status FROM synced_video WHERE id=1') videoStatus=$(mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e 'SELECT status FROM synced_video WHERE id=1')
videoClaimID1=$(mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e 'SELECT claim_id FROM synced_video WHERE id=1') videoClaimID1=$(mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e 'SELECT claim_id FROM synced_video WHERE id=1')
@ -71,7 +71,7 @@ mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e "UPDATE youtube_data
# Trigger transfer api # Trigger transfer api
curl -i -H 'Accept: application/json' -H 'Content-Type: application/json' 'http://localhost:15400/yt/transfer?auth_token=youtubertoken&address=n4eYeXAYmHo4YRUDEfsEhucy8y5LKRMcHg&public_key=tpubDA9GDAntyJu4hD3wU7175p7CuV6DWbYXfyb2HedBA3yuBp9HZ4n3QE4Ex6RHCSiEuVp2nKAL1Lzf2ZLo9ApaFgNaJjG6Xo1wB3iEeVbrDZp' curl -i -H 'Accept: application/json' -H 'Content-Type: application/json' 'http://localhost:15400/yt/transfer?auth_token=youtubertoken&address=n4eYeXAYmHo4YRUDEfsEhucy8y5LKRMcHg&public_key=tpubDA9GDAntyJu4hD3wU7175p7CuV6DWbYXfyb2HedBA3yuBp9HZ4n3QE4Ex6RHCSiEuVp2nKAL1Lzf2ZLo9ApaFgNaJjG6Xo1wB3iEeVbrDZp'
# Execute the transfer test! # Execute the transfer test!
./../bin/ytsync --channelID UCNQfQvFMPnInwsU_iGYArJQ --videos-limit 1 --concurrent-jobs 4 #Force channel intended...just in case. This channel lines up with the api container ./../bin/ytsync --channelID UCNQfQvFMPnInwsU_iGYArJQ --videos-limit 2 --concurrent-jobs 4 #Force channel intended...just in case. This channel lines up with the api container
# Check that the channel and the video are marked as transferred and that all supports are spent # Check that the channel and the video are marked as transferred and that all supports are spent
channelTransferStatus=$(mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e 'SELECT distinct transfer_state FROM youtube_data') channelTransferStatus=$(mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e 'SELECT distinct transfer_state FROM youtube_data')
videoTransferStatus=$(mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e 'SELECT distinct transferred FROM synced_video') videoTransferStatus=$(mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e 'SELECT distinct transferred FROM synced_video')

View file

@ -6,6 +6,7 @@ lbryum_servers:
- walletserver:50001 - walletserver:50001
save_blobs: true save_blobs: true
save_files: false save_files: false
reflect_streams: false #for the love of god, don't upload regtest streams to reflector!
share_usage_data: false share_usage_data: false
tcp_port: 3333 tcp_port: 3333
udp_port: 4444 udp_port: 4444

View file

@ -2,13 +2,18 @@ package manager
import ( import (
"fmt" "fmt"
"strconv"
"strings"
"time"
"github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/jsonrpc" "github.com/lbryio/lbry.go/v2/extras/jsonrpc"
"github.com/lbryio/lbry.go/v2/extras/stop" "github.com/lbryio/lbry.go/v2/extras/stop"
"github.com/lbryio/lbry.go/v2/extras/util" "github.com/lbryio/lbry.go/v2/extras/util"
"github.com/lbryio/ytsync/sdk" "github.com/lbryio/ytsync/sdk"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"strconv"
) )
func waitConfirmations(s *Sync) error { func waitConfirmations(s *Sync) error {
@ -40,6 +45,12 @@ waiting:
return nil return nil
} }
type abandonResponse struct {
ClaimID string
Error error
Amount float64
}
func abandonSupports(s *Sync) (float64, error) { func abandonSupports(s *Sync) (float64, error) {
totalPages := uint64(1) totalPages := uint64(1)
var allSupports []jsonrpc.Claim var allSupports []jsonrpc.Claim
@ -55,31 +66,99 @@ func abandonSupports(s *Sync) (float64, error) {
allSupports = append(allSupports, (*supports).Items...) allSupports = append(allSupports, (*supports).Items...)
totalPages = (*supports).TotalPages totalPages = (*supports).TotalPages
} }
producerWG := &stop.Group{}
claimIDChan := make(chan string, len(allSupports))
abandonRspChan := make(chan abandonResponse, len(allSupports))
alreadyAbandoned := make(map[string]bool, len(allSupports)) alreadyAbandoned := make(map[string]bool, len(allSupports))
producerWG.Add(1)
go func() {
defer producerWG.Done()
for _, support := range allSupports {
_, ok := alreadyAbandoned[support.ClaimID]
if ok {
continue
}
alreadyAbandoned[support.ClaimID] = true
claimIDChan <- support.ClaimID
}
}()
consumerWG := &stop.Group{}
//TODO: remove this once the SDK team fixes their RPC bugs....
s.daemon.SetRPCTimeout(30 * time.Second)
defer s.daemon.SetRPCTimeout(40 * time.Minute)
for i := 0; i < s.ConcurrentVideos; i++ {
consumerWG.Add(1)
go func() {
defer consumerWG.Done()
for {
claimID, more := <-claimIDChan
if !more {
return
} else {
summary, err := s.daemon.SupportAbandon(&claimID, nil, nil, nil, nil)
if err != nil {
if strings.Contains(err.Error(), "Client.Timeout exceeded while awaiting headers") {
log.Errorf("Support abandon for %s timed out, retrying...", claimID)
summary, err = s.daemon.SupportAbandon(&claimID, nil, nil, nil, nil)
if err != nil {
//TODO GUESS HOW MUCH LBC WAS RELEASED THAT WE DON'T KNOW ABOUT, because screw you SDK
abandonRspChan <- abandonResponse{
ClaimID: claimID,
Error: err,
Amount: 0, // this is likely wrong, but oh well... there is literally nothing I can do about it
}
return
}
} else {
abandonRspChan <- abandonResponse{
ClaimID: claimID,
Error: err,
Amount: 0,
}
return
}
}
if len(summary.Outputs) < 1 {
abandonRspChan <- abandonResponse{
ClaimID: claimID,
Error: errors.Err("error abandoning supports: no outputs while abandoning %s", claimID),
Amount: 0,
}
return
}
outputAmount, err := strconv.ParseFloat(summary.Outputs[0].Amount, 64)
if err != nil {
abandonRspChan <- abandonResponse{
ClaimID: claimID,
Error: errors.Err(err),
Amount: 0,
}
return
}
log.Infof("Abandoned supports of %.4f LBC for claim %s", outputAmount, claimID)
abandonRspChan <- abandonResponse{
ClaimID: claimID,
Error: nil,
Amount: outputAmount,
}
return
}
}
}()
}
producerWG.Wait()
close(claimIDChan)
consumerWG.Wait()
close(abandonRspChan)
totalAbandoned := 0.0 totalAbandoned := 0.0
for _, support := range allSupports { for r := range abandonRspChan {
_, ok := alreadyAbandoned[support.ClaimID] if r.Error != nil {
if ok { log.Errorf("Failed abandoning supports for %s: %s", r.ClaimID, r.Error.Error())
continue continue
} }
supportOnTransferredClaim := support.Address == s.clientPublishAddress //todo: probably not needed anymore totalAbandoned += r.Amount
if supportOnTransferredClaim {
continue
}
alreadyAbandoned[support.ClaimID] = true
summary, err := s.daemon.SupportAbandon(&support.ClaimID, nil, nil, nil, nil)
if err != nil {
return totalAbandoned, errors.Err(err)
}
if len(summary.Outputs) < 1 {
return totalAbandoned, errors.Err("error abandoning supports: no outputs while abandoning %s", support.ClaimID)
}
outputAmount, err := strconv.ParseFloat(summary.Outputs[0].Amount, 64)
if err != nil {
return totalAbandoned, errors.Err(err)
}
totalAbandoned += outputAmount
log.Infof("Abandoned supports of %.4f LBC for claim %s", outputAmount, support.ClaimID)
} }
return totalAbandoned, nil return totalAbandoned, nil
} }
@ -105,6 +184,7 @@ func transferVideos(s *Sync) error {
producerWG := &stop.Group{} producerWG := &stop.Group{}
producerWG.Add(1) producerWG.Add(1)
go func() { go func() {
defer producerWG.Done()
for _, video := range s.syncedVideos { for _, video := range s.syncedVideos {
if !video.Published || video.Transferred || video.MetadataVersion != LatestMetadataVersion { if !video.Published || video.Transferred || video.MetadataVersion != LatestMetadataVersion {
continue continue
@ -142,7 +222,6 @@ func transferVideos(s *Sync) error {
videoStatus: &videoStatus, videoStatus: &videoStatus,
} }
} }
producerWG.Done()
}() }()
consumerWG := &stop.Group{} consumerWG := &stop.Group{}