Refactor to support future direction of development
This commit is contained in:
4 changed files with 481 additions and 201 deletions
@ -5,12 +5,9 @@ import (
log ""
@ -20,9 +17,6 @@ import (
type SyncContext struct {
@ -80,122 +74,45 @@ func localCmd(cmd *cobra.Command, args []string) {
videoID := args[0]
lbrynet := jsonrpc.NewClient(syncContext.LbrynetAddr)
lbrynet.SetRPCTimeout(5 * time.Minute)
log.Debugf("Running sync for YouTube video ID %s", videoID)
status, err := lbrynet.Status()
var publisher VideoPublisher
publisher, err = NewLocalSDKPublisher(syncContext.LbrynetAddr, syncContext.ChannelID, syncContext.PublishBid)
if err != nil {
log.Errorf("Error setting up publisher: %v", err)
if !status.IsRunning {
log.Error("SDK is not running")
var videoSource VideoSource
videoSource, err = NewYtdlVideoSource(syncContext.TempDir)
if err != nil {
log.Errorf("Error setting up video source: %v", err)
// Should check to see if the SDK owns the channel
// Should check to see if wallet is unlocked
// but jsonrpc.Client doesn't have WalletStatus method
// so skip for now
// Should check to see if streams are configured to be reflected and warn if not
// but jsonrpc.Client doesn't have SettingsGet method to see if streams are reflected
// so use File.UploadingToReflector as a proxy for now
videoBasePath := path.Join(syncContext.TempDir, videoID)
videoMetadata, videoMetadataPath, err := getVideoMetadata(videoBasePath, videoID)
sourceVideo, err := videoSource.GetVideo(videoID)
if err != nil {
log.Errorf("Error getting video metadata: %v", err)
log.Errorf("Error getting source video: %v", err)
err = downloadVideo(videoBasePath, videoMetadataPath)
processedVideo, err := processVideoForPublishing(*sourceVideo, syncContext.ChannelID)
if err != nil {
log.Errorf("Error downloading video: %v", err)
log.Errorf("Error processing source video for publishing: %v", err)
tags, err := tags_manager.SanitizeTags(videoMetadata.Tags, syncContext.ChannelID)
done, err := publisher.Publish(*processedVideo)
if err != nil {
log.Errorf("Error sanitizing tags: %v", err)
log.Errorf("Error publishing video: %v", err)
urlsRegex := regexp.MustCompile(`(?m) ?(f|ht)(tp)(s?)(://)(.*)[.|/](.*)`)
descriptionSample := urlsRegex.ReplaceAllString(videoMetadata.Description, "")
info := whatlanggo.Detect(descriptionSample)
info2 := whatlanggo.Detect(videoMetadata.Title)
var languages []string = nil
if info.IsReliable() && info.Lang.Iso6391() != "" {
language := info.Lang.Iso6391()
languages = []string{language}
} else if info2.IsReliable() && info2.Lang.Iso6391() != "" {
language := info2.Lang.Iso6391()
languages = []string{language}
// Thumbnail and ReleaseTime need to be properly determined
streamCreateOptions := jsonrpc.StreamCreateOptions {
ClaimCreateOptions: jsonrpc.ClaimCreateOptions {
Title: &videoMetadata.Title,
Description: util.PtrToString(getAbbrevDescription(videoMetadata)),
Languages: languages,
//ThumbnailURL: &v.thumbnailURL,
Tags: tags,
ReleaseTime: util.PtrToInt64(time.Now().Unix()),
ChannelID: &syncContext.ChannelID,
License: util.PtrToString("Copyrighted (contact publisher)"),
videoPath, err := getVideoDownloadedPath(syncContext.TempDir, videoID)
err = <-done
if err != nil {
log.Errorf("Error determining downloaded video path: %v", err)
log.Errorf("Error while wating for stream to reflect: %v", err)
fmt.Println("%s", *streamCreateOptions.ClaimCreateOptions.Title)
fmt.Println("%s", *streamCreateOptions.ClaimCreateOptions.Description)
fmt.Println("%v", streamCreateOptions.ClaimCreateOptions.Languages)
fmt.Println("%v", streamCreateOptions.ClaimCreateOptions.Tags)
claimName := namer.NewNamer().GetNextName(videoMetadata.Title)
log.Infof("Publishing stream as %s", claimName)
txSummary, err := lbrynet.StreamCreate(claimName, videoPath, syncContext.PublishBid, streamCreateOptions)
if err != nil {
log.Errorf("Error creating stream: %v", err)
for {
fileListResponse, fileIndex, err := findFileByTxid(lbrynet, txSummary.Txid)
if err != nil {
log.Errorf("Error finding file by txid: %v", err)
if fileListResponse == nil {
log.Errorf("Could not find file in list with correct txid")
fileStatus := fileListResponse.Items[fileIndex]
if fileStatus.IsFullyReflected {
log.Info("Stream is fully reflected")
if !fileStatus.UploadingToReflector {
log.Warn("Stream is not being uploaded to a reflector. Check your lbrynet settings if this is a mistake.")
log.Infof("Stream reflector progress: %d%%", fileStatus.ReflectorProgress)
time.Sleep(5 * time.Second)
@ -239,88 +156,6 @@ func loadVideoMetadata(path string) (*ytdl.YtdlVideo, error) {
return videoMetadata, nil
func downloadVideoMetadata(basePath, videoID string) error {
ytdlArgs := []string{
fmt.Sprintf("", videoID),
ytdlCmd := exec.Command("yt-dlp", ytdlArgs...)
output, err := runCmd(ytdlCmd)
return err
func downloadVideo(basePath, metadataPath string) error {
ytdlArgs := []string{
"ffmpeg:-movflags faststart",
ytdlCmd := exec.Command("yt-dlp", ytdlArgs...)
output, err := runCmd(ytdlCmd)
return err
func runCmd(cmd *exec.Cmd) ([]string, error) {
log.Infof("running cmd: %s", strings.Join(cmd.Args, " "))
var err error
stderr, err := cmd.StderrPipe()
if err != nil {
return nil, err
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
err = cmd.Start()
if err != nil {
return nil, err
outLog, err := ioutil.ReadAll(stdout)
if err != nil {
return nil, err
errorLog, err := ioutil.ReadAll(stderr)
if err != nil {
return nil, err
done := make(chan error, 1)
go func() {
done <- cmd.Wait()
select {
case err := <-done:
if err != nil {
return nil, err
return strings.Split(strings.Replace(string(outLog), "\r\n", "\n", -1), "\n"), nil
func getVideoDownloadedPath(videoDir, videoID string) (string, error) {
files, err := ioutil.ReadDir(videoDir)
if err != nil {
@ -339,31 +174,100 @@ func getVideoDownloadedPath(videoDir, videoID string) (string, error) {
func getAbbrevDescription(v *ytdl.YtdlVideo) string {
func getAbbrevDescription(v SourceVideo) string {
if v.Description == nil {
return v.SourceURL
maxLength := 2800
description := strings.TrimSpace(v.Description)
additionalDescription := "\n" + v.ID
description := strings.TrimSpace(*v.Description)
additionalDescription := "\n" + v.SourceURL
if len(description) > maxLength {
description = description[:maxLength]
return description + "\n..." + additionalDescription
// if jsonrpc.Client.FileList is extended to match the actual jsonrpc schema, this can be removed
func findFileByTxid(client *jsonrpc.Client, txid string) (*jsonrpc.FileListResponse, int, error) {
response, err := client.FileList(0, 20)
for {
if err != nil {
log.Errorf("Error getting file list page: %v", err)
return nil, 0, err
index := sort.Search(len(response.Items), func (i int) bool { return response.Items[i].Txid == txid })
if index < len(response.Items) {
return response, index, nil
if response.Page >= response.TotalPages {
return nil, 0, nil
response, err = client.FileList(response.Page + 1, 20)
type SourceVideo struct {
ID string
Title *string
Description *string
SourceURL string
Languages []string
Tags []string
ReleaseTime *int64
ThumbnailURL *string
FullLocalPath string
type PublishableVideo struct {
ID string
ClaimName string
Title string
Description string
SourceURL string
Languages []string
Tags []string
ReleaseTime int64
ThumbnailURL string
FullLocalPath string
func processVideoForPublishing(source SourceVideo, channelID string) (*PublishableVideo, error) {
tags, err := tags_manager.SanitizeTags(source.Tags, channelID)
if err != nil {
log.Errorf("Error sanitizing tags: %v", err)
return nil, err
descriptionSample := ""
if source.Description != nil {
urlsRegex := regexp.MustCompile(`(?m) ?(f|ht)(tp)(s?)(://)(.*)[.|/](.*)`)
descriptionSample = urlsRegex.ReplaceAllString(*source.Description, "")
info := whatlanggo.Detect(descriptionSample)
title := ""
if source.Title != nil {
title = *source.Title
info2 := whatlanggo.Detect(title)
var languages []string = nil
if info.IsReliable() && info.Lang.Iso6391() != "" {
language := info.Lang.Iso6391()
languages = []string{language}
} else if info2.IsReliable() && info2.Lang.Iso6391() != "" {
language := info2.Lang.Iso6391()
languages = []string{language}
claimName := namer.NewNamer().GetNextName(title)
thumbnailURL := ""
if source.ThumbnailURL != nil {
thumbnailURL = *source.ThumbnailURL
processed := PublishableVideo {
ClaimName: claimName,
Title: title,
Description: getAbbrevDescription(source),
Languages: languages,
Tags: tags,
ReleaseTime: *source.ReleaseTime,
ThumbnailURL: thumbnailURL,
FullLocalPath: source.FullLocalPath,
log.Debugf("Video prepared for publication: %v", processed)
return &processed, nil
type VideoSource interface {
GetVideo(id string) (*SourceVideo, error)
type VideoPublisher interface {
Publish(video PublishableVideo) (chan error, error)
Normal file
Normal file
@ -0,0 +1,120 @@
package local
import (
log ""
type LocalSDKPublisher struct {
channelID string
publishBid float64
lbrynet *jsonrpc.Client
func NewLocalSDKPublisher(sdkAddr, channelID string, publishBid float64) (*LocalSDKPublisher, error) {
lbrynet := jsonrpc.NewClient(sdkAddr)
lbrynet.SetRPCTimeout(5 * time.Minute)
status, err := lbrynet.Status()
if err != nil {
return nil, err
if !status.IsRunning {
return nil, errors.New("SDK is not running")
// Should check to see if the SDK owns the channel
// Should check to see if wallet is unlocked
// but jsonrpc.Client doesn't have WalletStatus method
// so skip for now
// Should check to see if streams are configured to be reflected and warn if not
// but jsonrpc.Client doesn't have SettingsGet method to see if streams are reflected
// so use File.UploadingToReflector as a proxy for now
publisher := LocalSDKPublisher {
channelID: channelID,
publishBid: publishBid,
lbrynet: lbrynet,
return &publisher, nil
func (p *LocalSDKPublisher) Publish(video PublishableVideo) (chan error, error) {
streamCreateOptions := jsonrpc.StreamCreateOptions {
ClaimCreateOptions: jsonrpc.ClaimCreateOptions {
Title: &video.Title,
Description: &video.Description,
Languages: video.Languages,
ThumbnailURL: &video.ThumbnailURL,
Tags: video.Tags,
ReleaseTime: &video.ReleaseTime,
ChannelID: &p.channelID,
License: util.PtrToString("Copyrighted (contact publisher)"),
txSummary, err := p.lbrynet.StreamCreate(video.ClaimName, video.FullLocalPath, p.publishBid, streamCreateOptions)
if err != nil {
return nil, err
done := make(chan error, 1)
go func() {
for {
fileListResponse, fileIndex, err := findFileByTxid(p.lbrynet, txSummary.Txid)
if err != nil {
log.Errorf("Error finding file by txid: %v", err)
done <- err
if fileListResponse == nil {
log.Errorf("Could not find file in list with correct txid")
done <- err
fileStatus := fileListResponse.Items[fileIndex]
if fileStatus.IsFullyReflected {
log.Info("Stream is fully reflected")
if !fileStatus.UploadingToReflector {
log.Warn("Stream is not being uploaded to a reflector. Check your lbrynet settings if this is a mistake.")
log.Infof("Stream reflector progress: %d%%", fileStatus.ReflectorProgress)
time.Sleep(5 * time.Second)
done <- nil
return done, nil
// if jsonrpc.Client.FileList is extended to match the actual jsonrpc schema, this can be removed
func findFileByTxid(client *jsonrpc.Client, txid string) (*jsonrpc.FileListResponse, int, error) {
response, err := client.FileList(0, 20)
for {
if err != nil {
log.Errorf("Error getting file list page: %v", err)
return nil, 0, err
index := sort.Search(len(response.Items), func (i int) bool { return response.Items[i].Txid == txid })
if index < len(response.Items) {
return response, index, nil
if response.Page >= response.TotalPages {
return nil, 0, nil
response, err = client.FileList(response.Page + 1, 20)
Normal file
Normal file
@ -0,0 +1,202 @@
package local
import (
log ""
type Ytdl struct {
DownloadDir string
func NewYtdl(downloadDir string) (*Ytdl, error) {
// TODO validate download dir
y := Ytdl {
DownloadDir: downloadDir,
return &y, nil
func (y *Ytdl) GetVideoMetadata(videoID string) (*ytdl.YtdlVideo, error) {
metadataPath, err := y.GetVideoMetadataFile(videoID)
if err != nil {
return nil, err
metadataBytes, err := os.ReadFile(metadataPath)
if err != nil {
return nil, err
var metadata *ytdl.YtdlVideo
err = json.Unmarshal(metadataBytes, &metadata)
if err != nil {
return nil, err
return metadata, nil
func (y *Ytdl) GetVideoMetadataFile(videoID string) (string, error) {
basePath := path.Join(y.DownloadDir, videoID)
metadataPath := basePath + ".info.json"
_, err := os.Stat(metadataPath)
if err != nil && !os.IsNotExist(err) {
log.Errorf("Error determining if video metadata already exists: %v", err)
return "", err
} else if err != nil {
log.Debugf("Metadata file for video %s does not exist. Downloading now.", videoID)
err = downloadVideoMetadata(basePath, videoID)
if err != nil {
return "", err
return metadataPath, nil
func (y *Ytdl) GetVideoFile(videoID string) (string, error) {
videoPath, err := findDownloadedVideo(y.DownloadDir, videoID)
if err != nil {
return "", err
if videoPath != nil {
return *videoPath, nil
basePath := path.Join(y.DownloadDir, videoID)
metadataPath, err := y.GetVideoMetadataFile(videoID)
if err != nil {
log.Errorf("Error getting metadata path in preparation for video download: %v", err)
return "", err
err = downloadVideo(basePath, metadataPath)
if err != nil {
return "", nil
videoPath, err = findDownloadedVideo(y.DownloadDir, videoID)
if err != nil {
log.Errorf("Error from findDownloadedVideo() after already succeeding once: %v", err)
return "", err
if videoPath == nil {
return "", errors.New("Could not find a downloaded video after successful download.")
return *videoPath, nil
func findDownloadedVideo(videoDir, videoID string) (*string, error) {
files, err := ioutil.ReadDir(videoDir)
if err != nil {
return nil, err
for _, f := range files {
if f.IsDir() {
if path.Ext(f.Name()) == ".mp4" && strings.Contains(f.Name(), videoID) {
videoPath := path.Join(videoDir, f.Name())
return &videoPath, nil
return nil, nil
func downloadVideoMetadata(basePath, videoID string) error {
ytdlArgs := []string{
fmt.Sprintf("", videoID),
ytdlCmd := exec.Command("yt-dlp", ytdlArgs...)
output, err := runCmd(ytdlCmd)
return err
func downloadVideo(basePath, metadataPath string) error {
ytdlArgs := []string{
"ffmpeg:-movflags faststart",
ytdlCmd := exec.Command("yt-dlp", ytdlArgs...)
output, err := runCmd(ytdlCmd)
return err
func runCmd(cmd *exec.Cmd) ([]string, error) {
log.Infof("running cmd: %s", strings.Join(cmd.Args, " "))
var err error
stderr, err := cmd.StderrPipe()
if err != nil {
return nil, err
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
err = cmd.Start()
if err != nil {
return nil, err
outLog, err := ioutil.ReadAll(stdout)
if err != nil {
return nil, err
errorLog, err := ioutil.ReadAll(stderr)
if err != nil {
return nil, err
done := make(chan error, 1)
go func() {
done <- cmd.Wait()
select {
case err := <-done:
if err != nil {
return nil, err
return strings.Split(strings.Replace(string(outLog), "\r\n", "\n", -1), "\n"), nil
Normal file
Normal file
@ -0,0 +1,54 @@
package local
import (
log ""
type YtdlVideoSource struct {
downloader Ytdl
func NewYtdlVideoSource(downloadDir string) (*YtdlVideoSource, error) {
ytdl, err := NewYtdl(downloadDir)
if err != nil {
return nil, err
source := YtdlVideoSource {
downloader: *ytdl,
return &source, nil
func (s *YtdlVideoSource) GetVideo(id string) (*SourceVideo, error) {
metadata, err := s.downloader.GetVideoMetadata(id)
if err != nil {
return nil, err
videoPath, err := s.downloader.GetVideoFile(id)
if err != nil {
return nil, err
sourceVideo := SourceVideo {
ID: id,
Title: &metadata.Title,
Description: &metadata.Description,
SourceURL: "\n" + id,
Languages: []string{},
Tags: metadata.Tags,
ReleaseTime: util.PtrToInt64(time.Now().Unix()),
ThumbnailURL: nil,
FullLocalPath: videoPath,
log.Debugf("Source video retrieved via ytdl: %v", sourceVideo)
return &sourceVideo, nil
Add table
Reference in a new issue