diff --git a/.gitignore b/.gitignore index a0067ed..5d95aa7 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ bin/ e2e/persist e2e/supporty/supporty .env -blobsfiles \ No newline at end of file +blobsfiles +ytsync_docker diff --git a/e2e/data_setup.sh b/e2e/data_setup.sh index a1ad9a1..e54ce69 100755 --- a/e2e/data_setup.sh +++ b/e2e/data_setup.sh @@ -20,5 +20,5 @@ ADDYTSYNCAUTHTOKEN='INSERT INTO auth_token (user_id, value) VALUE(2,"youtubertok mysql -u lbry -plbry -D lbry -h "127.0.0.1" -P 15500 -e "$ADDYTSYNCAUTHTOKEN" #Add their youtube channel to be synced ADDYTCHANNEL="INSERT INTO youtube_data (user_id, status_token,desired_lbry_channel,channel_id,channel_name,status,created_at,source,total_videos,total_subscribers) -VALUE(2,'3qzGyuVjQaf7t4pKKu2Er1NRW2LJkeWw','@beamertest','UCCyr5j8akeu9j4Q7urV0Lqw','BeamerAtLBRY','queued','2019-08-01 00:00:00','sync',1,0)" +VALUE(2,'3qzGyuVjQaf7t4pKKu2Er1NRW2LJkeWw','@beamertest','UCNQfQvFMPnInwsU_iGYArJQ','BeamerAtLBRY','queued','2019-08-01 00:00:00','sync',1,0)" mysql -u lbry -plbry -D lbry -h "127.0.0.1" -P 15500 -e "$ADDYTCHANNEL" diff --git a/e2e/e2e.sh b/e2e/e2e.sh index 6208818..452906f 100755 --- a/e2e/e2e.sh +++ b/e2e/e2e.sh @@ -53,26 +53,28 @@ echo "successfully started..." ./data_setup.sh # Execute the sync test! -./../bin/ytsync --channelID UCCyr5j8akeu9j4Q7urV0Lqw #Force channel intended...just in case. This channel lines up with the api container +./../bin/ytsync --channelID UCNQfQvFMPnInwsU_iGYArJQ --videos-limit 1 --concurrent-jobs 4 #Force channel intended...just in case. This channel lines up with the api container status=$(mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e 'SELECT status FROM youtube_data WHERE id=1') videoStatus=$(mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e 'SELECT status FROM synced_video WHERE id=1') -videoClaimID=$(mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e 'SELECT claim_id FROM synced_video WHERE id=1') -videoClaimAddress=$(mysql -u lbry -plbry -ss -D chainquery -h "127.0.0.1" -P 15600 -e 'SELECT claim_address FROM claim WHERE id=2') +videoClaimID1=$(mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e 'SELECT claim_id FROM synced_video WHERE id=1') +videoClaimID2=$(mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e 'SELECT claim_id FROM synced_video WHERE id=1') +videoClaimAddress1=$(mysql -u lbry -plbry -ss -D chainquery -h "127.0.0.1" -P 15600 -e 'SELECT claim_address FROM claim WHERE id=2') +videoClaimAddress2=$(mysql -u lbry -plbry -ss -D chainquery -h "127.0.0.1" -P 15600 -e 'SELECT claim_address FROM claim WHERE id=2') # Create Supports for published claim -./supporty/supporty @BeamerTest "${videoClaimID}" "${videoClaimAddress}" lbrycrd_regtest 1.0 -./supporty/supporty @BeamerTest "${videoClaimID}" "${videoClaimAddress}" lbrycrd_regtest 2.0 -./supporty/supporty @BeamerTest "${videoClaimID}" "${videoClaimAddress}" lbrycrd_regtest 3.0 -./supporty/supporty @BeamerTest "${videoClaimID}" "${videoClaimAddress}" lbrycrd_regtest 3.0 +./supporty/supporty @BeamerTest "${videoClaimID1}" "${videoClaimAddress1}" lbrycrd_regtest 1.0 +./supporty/supporty @BeamerTest "${videoClaimID2}" "${videoClaimAddress2}" lbrycrd_regtest 2.0 +./supporty/supporty @BeamerTest "${videoClaimID1}" "${videoClaimAddress1}" lbrycrd_regtest 3.0 +./supporty/supporty @BeamerTest "${videoClaimID2}" "${videoClaimAddress2}" lbrycrd_regtest 3.0 curl --data-binary '{"jsonrpc":"1.0","id":"curltext","method":"generate","params":[1]}' -H 'content-type:text/plain;' --user lbry:lbry http://localhost:15200 # Reset status for tranfer test mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e "UPDATE youtube_data SET status = 'queued' WHERE id = 1" # Trigger transfer api curl -i -H 'Accept: application/json' -H 'Content-Type: application/json' 'http://localhost:15400/yt/transfer?auth_token=youtubertoken&address=n4eYeXAYmHo4YRUDEfsEhucy8y5LKRMcHg&public_key=tpubDA9GDAntyJu4hD3wU7175p7CuV6DWbYXfyb2HedBA3yuBp9HZ4n3QE4Ex6RHCSiEuVp2nKAL1Lzf2ZLo9ApaFgNaJjG6Xo1wB3iEeVbrDZp' # Execute the transfer test! -./../bin/ytsync --channelID UCCyr5j8akeu9j4Q7urV0Lqw #Force channel intended...just in case. This channel lines up with the api container +./../bin/ytsync --channelID UCNQfQvFMPnInwsU_iGYArJQ --videos-limit 1 --concurrent-jobs 4 #Force channel intended...just in case. This channel lines up with the api container # Check that the channel and the video are marked as transferred and that all supports are spent -channelTransferStatus=$(mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e 'SELECT transfer_state FROM youtube_data WHERE id=1') -videoTransferStatus=$(mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e 'SELECT transferred FROM synced_video WHERE id=1') +channelTransferStatus=$(mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e 'SELECT distinct transfer_state FROM youtube_data') +videoTransferStatus=$(mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e 'SELECT distinct transferred FROM synced_video') nrUnspentSupports=$(mysql -u lbry -plbry -ss -D chainquery -h "127.0.0.1" -P 15600 -e 'SELECT COUNT(*) FROM chainquery.support INNER JOIN output ON output.transaction_hash = support.transaction_hash_id AND output.vout = support.vout WHERE output.is_spent = 0') if [[ $status != "synced" || $videoStatus != "published" || $channelTransferStatus != "2" || $videoTransferStatus != "1" || $nrUnspentSupports != "1" ]]; then echo "~~!!!~~~FAILED~~~!!!~~" diff --git a/manager/transfer.go b/manager/transfer.go index fdda1f8..c155336 100644 --- a/manager/transfer.go +++ b/manager/transfer.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/jsonrpc" + "github.com/lbryio/lbry.go/v2/extras/stop" "github.com/lbryio/lbry.go/v2/extras/util" "github.com/lbryio/ytsync/sdk" log "github.com/sirupsen/logrus" @@ -83,77 +84,112 @@ func abandonSupports(s *Sync) (float64, error) { return totalAbandoned, nil } +type updateInfo struct { + ClaimID string + streamUpdateOptions *jsonrpc.StreamUpdateOptions + videoStatus *sdk.VideoStatus +} + func transferVideos(s *Sync) error { cleanTransfer := true - for _, video := range s.syncedVideos { - if !video.Published || video.Transferred || video.MetadataVersion != LatestMetadataVersion { - //log.Debugf("skipping video: %s", video.VideoID) - continue - } - //Todo - Wait for prior sync to see that the publish is confirmed in lbrycrd? - account, err := s.getDefaultAccount() - if err != nil { - return err - } - streams, err := s.daemon.StreamList(&account) - if err != nil { - return errors.Err(err) - } - var stream *jsonrpc.Claim = nil - for _, c := range *streams { - if c.ClaimID != video.ClaimID { + streamChan := make(chan updateInfo, s.ConcurrentVideos) + account, err := s.getDefaultAccount() + if err != nil { + return err + } + streams, err := s.daemon.StreamList(&account) + if err != nil { + return errors.Err(err) + } + producerWG := &stop.Group{} + producerWG.Add(1) + go func() { + for _, video := range s.syncedVideos { + if !video.Published || video.Transferred || video.MetadataVersion != LatestMetadataVersion { continue } - stream = &c - break - } - if stream == nil { - return nil - } - streamUpdateOptions := jsonrpc.StreamUpdateOptions{ - StreamCreateOptions: &jsonrpc.StreamCreateOptions{ - ClaimCreateOptions: jsonrpc.ClaimCreateOptions{ClaimAddress: &s.clientPublishAddress}, - }, - Bid: util.PtrToString("0.005"), // Todo - Dont hardcode - } + var stream *jsonrpc.Claim = nil + for _, c := range *streams { + if c.ClaimID != video.ClaimID { + continue + } + stream = &c + break + } + if stream == nil { + return + } - videoStatus := sdk.VideoStatus{ - ChannelID: s.YoutubeChannelID, - VideoID: video.VideoID, - ClaimID: video.ClaimID, - ClaimName: video.ClaimName, - Status: VideoStatusPublished, - IsTransferred: util.PtrToBool(true), + streamUpdateOptions := jsonrpc.StreamUpdateOptions{ + StreamCreateOptions: &jsonrpc.StreamCreateOptions{ + ClaimCreateOptions: jsonrpc.ClaimCreateOptions{ClaimAddress: &s.clientPublishAddress}, + }, + Bid: util.PtrToString("0.005"), // Todo - Dont hardcode + } + videoStatus := sdk.VideoStatus{ + ChannelID: s.YoutubeChannelID, + VideoID: video.VideoID, + ClaimID: video.ClaimID, + ClaimName: video.ClaimName, + Status: VideoStatusPublished, + IsTransferred: util.PtrToBool(true), + } + streamChan <- updateInfo{ + ClaimID: video.ClaimID, + streamUpdateOptions: &streamUpdateOptions, + videoStatus: &videoStatus, + } } + producerWG.Done() + }() - result, updateError := s.daemon.StreamUpdate(video.ClaimID, streamUpdateOptions) - if updateError != nil { - cleanTransfer = false - videoStatus.FailureReason = updateError.Error() - videoStatus.Status = VideoStatusTranferFailed - videoStatus.IsTransferred = util.PtrToBool(false) - } else { - videoStatus.IsTransferred = util.PtrToBool(len(result.Outputs) != 0) - } - log.Infof("TRANSFERRED %t", *videoStatus.IsTransferred) - statusErr := s.APIConfig.MarkVideoStatus(videoStatus) - if statusErr != nil { - return errors.Prefix(statusErr.Error(), updateError) - } - if updateError != nil { - return errors.Err(updateError) - } + consumerWG := &stop.Group{} + for i := 0; i < s.ConcurrentVideos; i++ { + consumerWG.Add(1) + go func(worker int) { + defer consumerWG.Done() + for { + ui, more := <-streamChan + if !more { + return + } else { + err := s.streamUpdate(&ui) + if err != nil { + cleanTransfer = false + } + } + } + }(i) } - // Todo - Transfer Channel as last step and post back to remote db that channel is transferred. - //Transfer channel + producerWG.Wait() + close(streamChan) + consumerWG.Wait() + if !cleanTransfer { return errors.Err("A video has failed to transfer for the channel...skipping channel transfer") } return nil } +func (s *Sync) streamUpdate(ui *updateInfo) error { + result, updateError := s.daemon.StreamUpdate(ui.ClaimID, *ui.streamUpdateOptions) + if updateError != nil { + ui.videoStatus.FailureReason = updateError.Error() + ui.videoStatus.Status = VideoStatusTranferFailed + ui.videoStatus.IsTransferred = util.PtrToBool(false) + } else { + ui.videoStatus.IsTransferred = util.PtrToBool(len(result.Outputs) != 0) + } + log.Infof("TRANSFERRED %t", *ui.videoStatus.IsTransferred) + statusErr := s.APIConfig.MarkVideoStatus(*ui.videoStatus) + if statusErr != nil { + return errors.Prefix(statusErr.Error(), updateError) + } + return errors.Err(updateError) +} + func transferChannel(s *Sync) error { account, err := s.getDefaultAccount() if err != nil { diff --git a/manager/ytsync.go b/manager/ytsync.go index af52d7b..18b2fb8 100644 --- a/manager/ytsync.go +++ b/manager/ytsync.go @@ -1094,7 +1094,7 @@ func (s *Sync) processVideo(v video) (err error) { return nil } - if !videoRequiresUpgrade && v.PlaylistPosition() > s.Manager.videosLimit { + if !videoRequiresUpgrade && v.PlaylistPosition() >= s.Manager.videosLimit { log.Println(v.ID() + " is old: skipping") return nil }