package local import ( "errors" "sort" "time" log "github.com/sirupsen/logrus" "github.com/lbryio/lbry.go/v2/extras/jsonrpc" "github.com/lbryio/lbry.go/v2/extras/util" ) type LocalSDKPublisher struct { channelID string publishBid float64 lbrynet *jsonrpc.Client } func NewLocalSDKPublisher(sdkAddr, channelID string, publishBid float64) (*LocalSDKPublisher, error) { lbrynet := jsonrpc.NewClient(sdkAddr) lbrynet.SetRPCTimeout(5 * time.Minute) status, err := lbrynet.Status() if err != nil { return nil, err } if !status.IsRunning { return nil, errors.New("SDK is not running") } // Should check to see if the SDK owns the channel // Should check to see if wallet is unlocked // but jsonrpc.Client doesn't have WalletStatus method // so skip for now // Should check to see if streams are configured to be reflected and warn if not // but jsonrpc.Client doesn't have SettingsGet method to see if streams are reflected // so use File.UploadingToReflector as a proxy for now publisher := LocalSDKPublisher { channelID: channelID, publishBid: publishBid, lbrynet: lbrynet, } return &publisher, nil } func (p *LocalSDKPublisher) Publish(video PublishableVideo, reflectStream bool) (string, <-chan error, error) { streamCreateOptions := jsonrpc.StreamCreateOptions { ClaimCreateOptions: jsonrpc.ClaimCreateOptions { Title: &video.Title, Description: &video.Description, Languages: video.Languages, ThumbnailURL: &video.ThumbnailURL, Tags: video.Tags, }, ReleaseTime: &video.ReleaseTime, ChannelID: &p.channelID, License: util.PtrToString("Copyrighted (contact publisher)"), } txSummary, err := p.lbrynet.StreamCreate(video.ClaimName, video.FullLocalPath, p.publishBid, streamCreateOptions) if err != nil { return "", nil, err } var claimID *string for _, output := range txSummary.Outputs { if output.Type == "claim" { claimID = &output.ClaimID break } } if claimID == nil { return "", nil, errors.New("Publish transaction did not have a claim output.") } if !reflectStream { return "", nil, nil } done := make(chan error, 1) go func() { for { fileListResponse, fileIndex, err := findFileByTxid(p.lbrynet, txSummary.Txid) if err != nil { log.Errorf("Error finding file by txid: %v", err) done <- err return } if fileListResponse == nil { log.Errorf("Could not find file in list with correct txid") done <- err return } fileStatus := fileListResponse.Items[fileIndex] if fileStatus.IsFullyReflected { log.Info("Stream is fully reflected") break } if !fileStatus.UploadingToReflector { log.Error("Stream is not being uploaded to a reflector. Check your lbrynet settings if this is a mistake.") done <- errors.New("Stream is not being reflected (check lbrynet settings).") return } log.Infof("Stream reflector progress: %d%%", fileStatus.ReflectorProgress) time.Sleep(5 * time.Second) } done <- nil }() return *claimID, done, nil } func (p *LocalSDKPublisher) PublishedVideoIterator(sinceTimestamp int64) <-chan PublishedVideoIteratorResult { videoCh := make(chan PublishedVideoIteratorResult, 10) go func() { defer close(videoCh) for page := uint64(0); ; page++ { streams, err := p.lbrynet.StreamList(nil, page, 100) if err != nil { log.Errorf("Error listing streams (page %d): %v", page, err) errResult := PublishedVideoIteratorResult { Error: err, } videoCh <- errResult return } if len(streams.Items) == 0 { return } for _, stream := range streams.Items { if stream.ChannelID != p.channelID || stream.Value.GetStream().ReleaseTime < sinceTimestamp { continue } languages := []string{} for _, language := range stream.Value.Languages { languages = append(languages, language.String()) } video := PublishedVideo { ClaimID: stream.ClaimID, NativeID: "", Source: "", ClaimName: stream.Name, Title: stream.Value.Title, Description: stream.Value.Description, Languages: languages, Tags: stream.Value.Tags, ReleaseTime: stream.Value.GetStream().ReleaseTime, ThumbnailURL: stream.Value.Thumbnail.Url, FullLocalPath: "", } videoResult := PublishedVideoIteratorResult { Video: &video, } videoCh <- videoResult } } }() return videoCh } // if jsonrpc.Client.FileList is extended to match the actual jsonrpc schema, this can be removed func findFileByTxid(client *jsonrpc.Client, txid string) (*jsonrpc.FileListResponse, int, error) { response, err := client.FileList(0, 20) for { if err != nil { log.Errorf("Error getting file list page: %v", err) return nil, 0, err } index := sort.Search(len(response.Items), func (i int) bool { return response.Items[i].Txid == txid }) if index < len(response.Items) { return response, index, nil } if response.Page >= response.TotalPages { return nil, 0, nil } response, err = client.FileList(response.Page + 1, 20) } }