make video transferring async

fix bug in videosLimit
change test subject to have more videos
This commit is contained in:
Niko Storni 2019-10-11 00:57:42 +02:00
parent b409d36de0
commit dd4aebdba4
5 changed files with 107 additions and 68 deletions

3
.gitignore vendored
View file

@ -2,4 +2,5 @@ bin/
e2e/persist e2e/persist
e2e/supporty/supporty e2e/supporty/supporty
.env .env
blobsfiles blobsfiles
ytsync_docker

View file

@ -20,5 +20,5 @@ ADDYTSYNCAUTHTOKEN='INSERT INTO auth_token (user_id, value) VALUE(2,"youtubertok
mysql -u lbry -plbry -D lbry -h "127.0.0.1" -P 15500 -e "$ADDYTSYNCAUTHTOKEN" mysql -u lbry -plbry -D lbry -h "127.0.0.1" -P 15500 -e "$ADDYTSYNCAUTHTOKEN"
#Add their youtube channel to be synced #Add their youtube channel to be synced
ADDYTCHANNEL="INSERT INTO youtube_data (user_id, status_token,desired_lbry_channel,channel_id,channel_name,status,created_at,source,total_videos,total_subscribers) ADDYTCHANNEL="INSERT INTO youtube_data (user_id, status_token,desired_lbry_channel,channel_id,channel_name,status,created_at,source,total_videos,total_subscribers)
VALUE(2,'3qzGyuVjQaf7t4pKKu2Er1NRW2LJkeWw','@beamertest','UCCyr5j8akeu9j4Q7urV0Lqw','BeamerAtLBRY','queued','2019-08-01 00:00:00','sync',1,0)" VALUE(2,'3qzGyuVjQaf7t4pKKu2Er1NRW2LJkeWw','@beamertest','UCNQfQvFMPnInwsU_iGYArJQ','BeamerAtLBRY','queued','2019-08-01 00:00:00','sync',1,0)"
mysql -u lbry -plbry -D lbry -h "127.0.0.1" -P 15500 -e "$ADDYTCHANNEL" mysql -u lbry -plbry -D lbry -h "127.0.0.1" -P 15500 -e "$ADDYTCHANNEL"

View file

@ -53,26 +53,28 @@ echo "successfully started..."
./data_setup.sh ./data_setup.sh
# Execute the sync test! # Execute the sync test!
./../bin/ytsync --channelID UCCyr5j8akeu9j4Q7urV0Lqw #Force channel intended...just in case. This channel lines up with the api container ./../bin/ytsync --channelID UCNQfQvFMPnInwsU_iGYArJQ --videos-limit 1 --concurrent-jobs 4 #Force channel intended...just in case. This channel lines up with the api container
status=$(mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e 'SELECT status FROM youtube_data WHERE id=1') status=$(mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e 'SELECT status FROM youtube_data WHERE id=1')
videoStatus=$(mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e 'SELECT status FROM synced_video WHERE id=1') videoStatus=$(mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e 'SELECT status FROM synced_video WHERE id=1')
videoClaimID=$(mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e 'SELECT claim_id FROM synced_video WHERE id=1') videoClaimID1=$(mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e 'SELECT claim_id FROM synced_video WHERE id=1')
videoClaimAddress=$(mysql -u lbry -plbry -ss -D chainquery -h "127.0.0.1" -P 15600 -e 'SELECT claim_address FROM claim WHERE id=2') videoClaimID2=$(mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e 'SELECT claim_id FROM synced_video WHERE id=1')
videoClaimAddress1=$(mysql -u lbry -plbry -ss -D chainquery -h "127.0.0.1" -P 15600 -e 'SELECT claim_address FROM claim WHERE id=2')
videoClaimAddress2=$(mysql -u lbry -plbry -ss -D chainquery -h "127.0.0.1" -P 15600 -e 'SELECT claim_address FROM claim WHERE id=2')
# Create Supports for published claim # Create Supports for published claim
./supporty/supporty @BeamerTest "${videoClaimID}" "${videoClaimAddress}" lbrycrd_regtest 1.0 ./supporty/supporty @BeamerTest "${videoClaimID1}" "${videoClaimAddress1}" lbrycrd_regtest 1.0
./supporty/supporty @BeamerTest "${videoClaimID}" "${videoClaimAddress}" lbrycrd_regtest 2.0 ./supporty/supporty @BeamerTest "${videoClaimID2}" "${videoClaimAddress2}" lbrycrd_regtest 2.0
./supporty/supporty @BeamerTest "${videoClaimID}" "${videoClaimAddress}" lbrycrd_regtest 3.0 ./supporty/supporty @BeamerTest "${videoClaimID1}" "${videoClaimAddress1}" lbrycrd_regtest 3.0
./supporty/supporty @BeamerTest "${videoClaimID}" "${videoClaimAddress}" lbrycrd_regtest 3.0 ./supporty/supporty @BeamerTest "${videoClaimID2}" "${videoClaimAddress2}" lbrycrd_regtest 3.0
curl --data-binary '{"jsonrpc":"1.0","id":"curltext","method":"generate","params":[1]}' -H 'content-type:text/plain;' --user lbry:lbry http://localhost:15200 curl --data-binary '{"jsonrpc":"1.0","id":"curltext","method":"generate","params":[1]}' -H 'content-type:text/plain;' --user lbry:lbry http://localhost:15200
# Reset status for tranfer test # Reset status for tranfer test
mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e "UPDATE youtube_data SET status = 'queued' WHERE id = 1" mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e "UPDATE youtube_data SET status = 'queued' WHERE id = 1"
# Trigger transfer api # Trigger transfer api
curl -i -H 'Accept: application/json' -H 'Content-Type: application/json' 'http://localhost:15400/yt/transfer?auth_token=youtubertoken&address=n4eYeXAYmHo4YRUDEfsEhucy8y5LKRMcHg&public_key=tpubDA9GDAntyJu4hD3wU7175p7CuV6DWbYXfyb2HedBA3yuBp9HZ4n3QE4Ex6RHCSiEuVp2nKAL1Lzf2ZLo9ApaFgNaJjG6Xo1wB3iEeVbrDZp' curl -i -H 'Accept: application/json' -H 'Content-Type: application/json' 'http://localhost:15400/yt/transfer?auth_token=youtubertoken&address=n4eYeXAYmHo4YRUDEfsEhucy8y5LKRMcHg&public_key=tpubDA9GDAntyJu4hD3wU7175p7CuV6DWbYXfyb2HedBA3yuBp9HZ4n3QE4Ex6RHCSiEuVp2nKAL1Lzf2ZLo9ApaFgNaJjG6Xo1wB3iEeVbrDZp'
# Execute the transfer test! # Execute the transfer test!
./../bin/ytsync --channelID UCCyr5j8akeu9j4Q7urV0Lqw #Force channel intended...just in case. This channel lines up with the api container ./../bin/ytsync --channelID UCNQfQvFMPnInwsU_iGYArJQ --videos-limit 1 --concurrent-jobs 4 #Force channel intended...just in case. This channel lines up with the api container
# Check that the channel and the video are marked as transferred and that all supports are spent # Check that the channel and the video are marked as transferred and that all supports are spent
channelTransferStatus=$(mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e 'SELECT transfer_state FROM youtube_data WHERE id=1') channelTransferStatus=$(mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e 'SELECT distinct transfer_state FROM youtube_data')
videoTransferStatus=$(mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e 'SELECT transferred FROM synced_video WHERE id=1') videoTransferStatus=$(mysql -u lbry -plbry -ss -D lbry -h "127.0.0.1" -P 15500 -e 'SELECT distinct transferred FROM synced_video')
nrUnspentSupports=$(mysql -u lbry -plbry -ss -D chainquery -h "127.0.0.1" -P 15600 -e 'SELECT COUNT(*) FROM chainquery.support INNER JOIN output ON output.transaction_hash = support.transaction_hash_id AND output.vout = support.vout WHERE output.is_spent = 0') nrUnspentSupports=$(mysql -u lbry -plbry -ss -D chainquery -h "127.0.0.1" -P 15600 -e 'SELECT COUNT(*) FROM chainquery.support INNER JOIN output ON output.transaction_hash = support.transaction_hash_id AND output.vout = support.vout WHERE output.is_spent = 0')
if [[ $status != "synced" || $videoStatus != "published" || $channelTransferStatus != "2" || $videoTransferStatus != "1" || $nrUnspentSupports != "1" ]]; then if [[ $status != "synced" || $videoStatus != "published" || $channelTransferStatus != "2" || $videoTransferStatus != "1" || $nrUnspentSupports != "1" ]]; then
echo "~~!!!~~~FAILED~~~!!!~~" echo "~~!!!~~~FAILED~~~!!!~~"

View file

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/jsonrpc" "github.com/lbryio/lbry.go/v2/extras/jsonrpc"
"github.com/lbryio/lbry.go/v2/extras/stop"
"github.com/lbryio/lbry.go/v2/extras/util" "github.com/lbryio/lbry.go/v2/extras/util"
"github.com/lbryio/ytsync/sdk" "github.com/lbryio/ytsync/sdk"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -83,77 +84,112 @@ func abandonSupports(s *Sync) (float64, error) {
return totalAbandoned, nil return totalAbandoned, nil
} }
type updateInfo struct {
ClaimID string
streamUpdateOptions *jsonrpc.StreamUpdateOptions
videoStatus *sdk.VideoStatus
}
func transferVideos(s *Sync) error { func transferVideos(s *Sync) error {
cleanTransfer := true cleanTransfer := true
for _, video := range s.syncedVideos {
if !video.Published || video.Transferred || video.MetadataVersion != LatestMetadataVersion {
//log.Debugf("skipping video: %s", video.VideoID)
continue
}
//Todo - Wait for prior sync to see that the publish is confirmed in lbrycrd? streamChan := make(chan updateInfo, s.ConcurrentVideos)
account, err := s.getDefaultAccount() account, err := s.getDefaultAccount()
if err != nil { if err != nil {
return err return err
} }
streams, err := s.daemon.StreamList(&account) streams, err := s.daemon.StreamList(&account)
if err != nil { if err != nil {
return errors.Err(err) return errors.Err(err)
} }
var stream *jsonrpc.Claim = nil producerWG := &stop.Group{}
for _, c := range *streams { producerWG.Add(1)
if c.ClaimID != video.ClaimID { go func() {
for _, video := range s.syncedVideos {
if !video.Published || video.Transferred || video.MetadataVersion != LatestMetadataVersion {
continue continue
} }
stream = &c
break
}
if stream == nil {
return nil
}
streamUpdateOptions := jsonrpc.StreamUpdateOptions{ var stream *jsonrpc.Claim = nil
StreamCreateOptions: &jsonrpc.StreamCreateOptions{ for _, c := range *streams {
ClaimCreateOptions: jsonrpc.ClaimCreateOptions{ClaimAddress: &s.clientPublishAddress}, if c.ClaimID != video.ClaimID {
}, continue
Bid: util.PtrToString("0.005"), // Todo - Dont hardcode }
} stream = &c
break
}
if stream == nil {
return
}
videoStatus := sdk.VideoStatus{ streamUpdateOptions := jsonrpc.StreamUpdateOptions{
ChannelID: s.YoutubeChannelID, StreamCreateOptions: &jsonrpc.StreamCreateOptions{
VideoID: video.VideoID, ClaimCreateOptions: jsonrpc.ClaimCreateOptions{ClaimAddress: &s.clientPublishAddress},
ClaimID: video.ClaimID, },
ClaimName: video.ClaimName, Bid: util.PtrToString("0.005"), // Todo - Dont hardcode
Status: VideoStatusPublished, }
IsTransferred: util.PtrToBool(true), videoStatus := sdk.VideoStatus{
ChannelID: s.YoutubeChannelID,
VideoID: video.VideoID,
ClaimID: video.ClaimID,
ClaimName: video.ClaimName,
Status: VideoStatusPublished,
IsTransferred: util.PtrToBool(true),
}
streamChan <- updateInfo{
ClaimID: video.ClaimID,
streamUpdateOptions: &streamUpdateOptions,
videoStatus: &videoStatus,
}
} }
producerWG.Done()
}()
result, updateError := s.daemon.StreamUpdate(video.ClaimID, streamUpdateOptions) consumerWG := &stop.Group{}
if updateError != nil { for i := 0; i < s.ConcurrentVideos; i++ {
cleanTransfer = false consumerWG.Add(1)
videoStatus.FailureReason = updateError.Error() go func(worker int) {
videoStatus.Status = VideoStatusTranferFailed defer consumerWG.Done()
videoStatus.IsTransferred = util.PtrToBool(false) for {
} else { ui, more := <-streamChan
videoStatus.IsTransferred = util.PtrToBool(len(result.Outputs) != 0) if !more {
} return
log.Infof("TRANSFERRED %t", *videoStatus.IsTransferred) } else {
statusErr := s.APIConfig.MarkVideoStatus(videoStatus) err := s.streamUpdate(&ui)
if statusErr != nil { if err != nil {
return errors.Prefix(statusErr.Error(), updateError) cleanTransfer = false
} }
if updateError != nil { }
return errors.Err(updateError) }
} }(i)
} }
// Todo - Transfer Channel as last step and post back to remote db that channel is transferred. producerWG.Wait()
//Transfer channel close(streamChan)
consumerWG.Wait()
if !cleanTransfer { if !cleanTransfer {
return errors.Err("A video has failed to transfer for the channel...skipping channel transfer") return errors.Err("A video has failed to transfer for the channel...skipping channel transfer")
} }
return nil return nil
} }
func (s *Sync) streamUpdate(ui *updateInfo) error {
result, updateError := s.daemon.StreamUpdate(ui.ClaimID, *ui.streamUpdateOptions)
if updateError != nil {
ui.videoStatus.FailureReason = updateError.Error()
ui.videoStatus.Status = VideoStatusTranferFailed
ui.videoStatus.IsTransferred = util.PtrToBool(false)
} else {
ui.videoStatus.IsTransferred = util.PtrToBool(len(result.Outputs) != 0)
}
log.Infof("TRANSFERRED %t", *ui.videoStatus.IsTransferred)
statusErr := s.APIConfig.MarkVideoStatus(*ui.videoStatus)
if statusErr != nil {
return errors.Prefix(statusErr.Error(), updateError)
}
return errors.Err(updateError)
}
func transferChannel(s *Sync) error { func transferChannel(s *Sync) error {
account, err := s.getDefaultAccount() account, err := s.getDefaultAccount()
if err != nil { if err != nil {

View file

@ -1094,7 +1094,7 @@ func (s *Sync) processVideo(v video) (err error) {
return nil return nil
} }
if !videoRequiresUpgrade && v.PlaylistPosition() > s.Manager.videosLimit { if !videoRequiresUpgrade && v.PlaylistPosition() >= s.Manager.videosLimit {
log.Println(v.ID() + " is old: skipping") log.Println(v.ID() + " is old: skipping")
return nil return nil
} }