From a9e76149e86d0c5fd32e6ac4b1a500eff82ef6b7 Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Fri, 11 Oct 2019 19:41:55 +0200 Subject: [PATCH] parallelize support abandons update lbrynet support in e2e test stop using reflector on regtest! change e2e channel target for more videos possibly fix a deadlock --- e2e/daemon_settings.yml | 1 + e2e/docker-compose.yml | 4 +- e2e/e2e.sh | 4 +- e2e/lbrynet/settings/daemon_settings.yml | 1 + manager/transfer.go | 125 ++++++++++++++++++----- 5 files changed, 108 insertions(+), 27 deletions(-) diff --git a/e2e/daemon_settings.yml b/e2e/daemon_settings.yml index 63b444e..7306a5e 100644 --- a/e2e/daemon_settings.yml +++ b/e2e/daemon_settings.yml @@ -6,6 +6,7 @@ lbryum_servers: - walletserver:50001 save_blobs: true save_files: false +reflect_streams: false #for the love of god, don't upload regtest streams to reflector! share_usage_data: false tcp_port: 3333 udp_port: 4444 diff --git a/e2e/docker-compose.yml b/e2e/docker-compose.yml index 6da437c..cb14a7d 100644 --- a/e2e/docker-compose.yml +++ b/e2e/docker-compose.yml @@ -21,7 +21,7 @@ services: ## Wallet Server ## ################### walletserver: - image: lbry/wallet-server:v0.42.0 + image: lbry/wallet-server:v0.42.2 restart: always environment: - DB_DIRECTORY=/database @@ -47,7 +47,7 @@ services: ## Lbrynet ## ############# lbrynet: - image: lbry/lbrynet:v0.42.0 + image: lbry/lbrynet:v0.42.2 restart: always ports: - "15100:5279" diff --git a/e2e/e2e.sh b/e2e/e2e.sh index 452906f..3eb59c1 100755 --- a/e2e/e2e.sh +++ b/e2e/e2e.sh @@ -53,7 +53,7 @@ echo "successfully started..." ./data_setup.sh # Execute the sync test! -./../bin/ytsync --channelID UCNQfQvFMPnInwsU_iGYArJQ --videos-limit 1 --concurrent-jobs 4 #Force channel intended...just in case. This channel lines up with the api container +./../bin/ytsync --channelID UCNQfQvFMPnInwsU_iGYArJQ --videos-limit 2 --concurrent-jobs 4 #Force channel intended...just in case. This channel lines up with the api container status=$(mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e 'SELECT status FROM youtube_data WHERE id=1') videoStatus=$(mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e 'SELECT status FROM synced_video WHERE id=1') videoClaimID1=$(mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e 'SELECT claim_id FROM synced_video WHERE id=1') @@ -71,7 +71,7 @@ mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e "UPDATE youtube_data # Trigger transfer api curl -i -H 'Accept: application/json' -H 'Content-Type: application/json' 'http://localhost:15400/yt/transfer?auth_token=youtubertoken&address=n4eYeXAYmHo4YRUDEfsEhucy8y5LKRMcHg&public_key=tpubDA9GDAntyJu4hD3wU7175p7CuV6DWbYXfyb2HedBA3yuBp9HZ4n3QE4Ex6RHCSiEuVp2nKAL1Lzf2ZLo9ApaFgNaJjG6Xo1wB3iEeVbrDZp' # Execute the transfer test! -./../bin/ytsync --channelID UCNQfQvFMPnInwsU_iGYArJQ --videos-limit 1 --concurrent-jobs 4 #Force channel intended...just in case. This channel lines up with the api container +./../bin/ytsync --channelID UCNQfQvFMPnInwsU_iGYArJQ --videos-limit 2 --concurrent-jobs 4 #Force channel intended...just in case. This channel lines up with the api container # Check that the channel and the video are marked as transferred and that all supports are spent channelTransferStatus=$(mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e 'SELECT distinct transfer_state FROM youtube_data') videoTransferStatus=$(mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e 'SELECT distinct transferred FROM synced_video') diff --git a/e2e/lbrynet/settings/daemon_settings.yml b/e2e/lbrynet/settings/daemon_settings.yml index 8a6bcb4..a9a4f59 100644 --- a/e2e/lbrynet/settings/daemon_settings.yml +++ b/e2e/lbrynet/settings/daemon_settings.yml @@ -6,6 +6,7 @@ lbryum_servers: - walletserver:50001 save_blobs: true save_files: false +reflect_streams: false #for the love of god, don't upload regtest streams to reflector! share_usage_data: false tcp_port: 3333 udp_port: 4444 diff --git a/manager/transfer.go b/manager/transfer.go index c155336..dc53ce3 100644 --- a/manager/transfer.go +++ b/manager/transfer.go @@ -2,13 +2,18 @@ package manager import ( "fmt" + "strconv" + "strings" + "time" + "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/jsonrpc" "github.com/lbryio/lbry.go/v2/extras/stop" "github.com/lbryio/lbry.go/v2/extras/util" + "github.com/lbryio/ytsync/sdk" + log "github.com/sirupsen/logrus" - "strconv" ) func waitConfirmations(s *Sync) error { @@ -40,6 +45,12 @@ waiting: return nil } +type abandonResponse struct { + ClaimID string + Error error + Amount float64 +} + func abandonSupports(s *Sync) (float64, error) { totalPages := uint64(1) var allSupports []jsonrpc.Claim @@ -55,31 +66,99 @@ func abandonSupports(s *Sync) (float64, error) { allSupports = append(allSupports, (*supports).Items...) totalPages = (*supports).TotalPages } + producerWG := &stop.Group{} + + claimIDChan := make(chan string, len(allSupports)) + abandonRspChan := make(chan abandonResponse, len(allSupports)) alreadyAbandoned := make(map[string]bool, len(allSupports)) + producerWG.Add(1) + go func() { + defer producerWG.Done() + for _, support := range allSupports { + _, ok := alreadyAbandoned[support.ClaimID] + if ok { + continue + } + alreadyAbandoned[support.ClaimID] = true + claimIDChan <- support.ClaimID + } + }() + consumerWG := &stop.Group{} + //TODO: remove this once the SDK team fixes their RPC bugs.... + s.daemon.SetRPCTimeout(30 * time.Second) + defer s.daemon.SetRPCTimeout(40 * time.Minute) + for i := 0; i < s.ConcurrentVideos; i++ { + consumerWG.Add(1) + go func() { + defer consumerWG.Done() + for { + claimID, more := <-claimIDChan + if !more { + return + } else { + summary, err := s.daemon.SupportAbandon(&claimID, nil, nil, nil, nil) + if err != nil { + if strings.Contains(err.Error(), "Client.Timeout exceeded while awaiting headers") { + log.Errorf("Support abandon for %s timed out, retrying...", claimID) + summary, err = s.daemon.SupportAbandon(&claimID, nil, nil, nil, nil) + if err != nil { + //TODO GUESS HOW MUCH LBC WAS RELEASED THAT WE DON'T KNOW ABOUT, because screw you SDK + abandonRspChan <- abandonResponse{ + ClaimID: claimID, + Error: err, + Amount: 0, // this is likely wrong, but oh well... there is literally nothing I can do about it + } + return + } + } else { + abandonRspChan <- abandonResponse{ + ClaimID: claimID, + Error: err, + Amount: 0, + } + return + } + } + if len(summary.Outputs) < 1 { + abandonRspChan <- abandonResponse{ + ClaimID: claimID, + Error: errors.Err("error abandoning supports: no outputs while abandoning %s", claimID), + Amount: 0, + } + return + } + outputAmount, err := strconv.ParseFloat(summary.Outputs[0].Amount, 64) + if err != nil { + abandonRspChan <- abandonResponse{ + ClaimID: claimID, + Error: errors.Err(err), + Amount: 0, + } + return + } + log.Infof("Abandoned supports of %.4f LBC for claim %s", outputAmount, claimID) + abandonRspChan <- abandonResponse{ + ClaimID: claimID, + Error: nil, + Amount: outputAmount, + } + return + } + } + }() + } + producerWG.Wait() + close(claimIDChan) + consumerWG.Wait() + close(abandonRspChan) + totalAbandoned := 0.0 - for _, support := range allSupports { - _, ok := alreadyAbandoned[support.ClaimID] - if ok { + for r := range abandonRspChan { + if r.Error != nil { + log.Errorf("Failed abandoning supports for %s: %s", r.ClaimID, r.Error.Error()) continue } - supportOnTransferredClaim := support.Address == s.clientPublishAddress //todo: probably not needed anymore - if supportOnTransferredClaim { - continue - } - alreadyAbandoned[support.ClaimID] = true - summary, err := s.daemon.SupportAbandon(&support.ClaimID, nil, nil, nil, nil) - if err != nil { - return totalAbandoned, errors.Err(err) - } - if len(summary.Outputs) < 1 { - return totalAbandoned, errors.Err("error abandoning supports: no outputs while abandoning %s", support.ClaimID) - } - outputAmount, err := strconv.ParseFloat(summary.Outputs[0].Amount, 64) - if err != nil { - return totalAbandoned, errors.Err(err) - } - totalAbandoned += outputAmount - log.Infof("Abandoned supports of %.4f LBC for claim %s", outputAmount, support.ClaimID) + totalAbandoned += r.Amount } return totalAbandoned, nil } @@ -105,6 +184,7 @@ func transferVideos(s *Sync) error { producerWG := &stop.Group{} producerWG.Add(1) go func() { + defer producerWG.Done() for _, video := range s.syncedVideos { if !video.Published || video.Transferred || video.MetadataVersion != LatestMetadataVersion { continue @@ -142,7 +222,6 @@ func transferVideos(s *Sync) error { videoStatus: &videoStatus, } } - producerWG.Done() }() consumerWG := &stop.Group{}