From 0618da08b8b9b8bed8e73202356d47ad98263301 Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Fri, 15 Sep 2017 18:13:13 -0400 Subject: [PATCH] yt sync works --- jsonrpc/daemon.go | 68 +++- ...emon_response_types.go => daemon_types.go} | 51 +++ main.go | 5 +- ytsync.go | 363 +++++++++++++++++- 4 files changed, 479 insertions(+), 8 deletions(-) rename jsonrpc/{daemon_response_types.go => daemon_types.go} (81%) diff --git a/jsonrpc/daemon.go b/jsonrpc/daemon.go index 128f05b..7aa665e 100644 --- a/jsonrpc/daemon.go +++ b/jsonrpc/daemon.go @@ -52,8 +52,11 @@ func debugParams(params map[string]interface{}) string { var s []string for k, v := range params { r := reflect.ValueOf(v) - if r.Kind() == reflect.Ptr && r.IsNil() { - continue + if r.Kind() == reflect.Ptr { + if r.IsNil() { + continue + } + v = r.Elem().Interface() } s = append(s, fmt.Sprintf("%s=%+v", k, v)) } @@ -239,7 +242,60 @@ func (d *Client) Resolve(url string) (*ResolveResponse, error) { }) } -//func (d *Client) Publish() (*PublishResponse, error) { -// response := new(PublishResponse) -// return response, d.call(response, "publish") -//} +func (d *Client) ChannelNew(name string, amount float64) (*ChannelNewResponse, error) { + response := new(ChannelNewResponse) + return response, d.call(response, "channel_new", map[string]interface{}{ + "channel_name": name, + "amount": amount, + }) +} + +func (d *Client) ChannelListMine() (*ChannelListMineResponse, error) { + response := new(ChannelListMineResponse) + return response, d.call(response, "channel_list_mine", map[string]interface{}{}) +} + +func (d *Client) WalletList() (*WalletListResponse, error) { + response := new(WalletListResponse) + return response, d.call(response, "wallet_list", map[string]interface{}{}) +} + +type PublishOptions struct { + Fee *Fee + Title *string + Description *string + Author *string + Language *string + License *string + LicenseURL *string + Thumbnail *string + Preview *string + NSFW *bool + ChannelName *string + ChannelID *string + ClaimAddress *string + ChangeAddress *string +} + +func (d *Client) Publish(name, filePath string, bid float64, options PublishOptions) (*PublishResponse, error) { + response := new(PublishResponse) + return response, d.call(response, "publish", map[string]interface{}{ + "name": name, + "file_path": filePath, + "bid": bid, + "fee": options.Fee, + "title": options.Title, + "description": options.Description, + "author": options.Author, + "language": options.Language, + "license": options.License, + "license_url": options.LicenseURL, + "thumbnail": options.Thumbnail, + "preview": options.Preview, + "nsfw": options.NSFW, + "channel_name": options.ChannelName, + "channel_id": options.ChannelID, + "claim_address": options.ClaimAddress, + "change_address": options.ChangeAddress, + }) +} diff --git a/jsonrpc/daemon_response_types.go b/jsonrpc/daemon_types.go similarity index 81% rename from jsonrpc/daemon_response_types.go rename to jsonrpc/daemon_types.go index 46928a9..f0af43f 100644 --- a/jsonrpc/daemon_response_types.go +++ b/jsonrpc/daemon_types.go @@ -8,6 +8,20 @@ import ( lbryschema "github.com/lbryio/lbryschema.go/pb" ) +type Currency string + +const ( + CurrencyLBC = Currency("LBC") + CurrencyUSD = Currency("USD") + CurrencyBTC = Currency("BTC") +) + +type Fee struct { + Currency Currency `json:"currency"` + Amount float64 `json:"amount"` + Address *string `json:"address"` +} + type Support struct { Amount float64 `json:"amount"` Nout int `json:"nout"` @@ -212,4 +226,41 @@ type ResolveResponseItem struct { Certificate *Claim `json:"certificate,omitempty"` Claim *Claim `json:"claim,omitempty"` ClaimsInChannel *uint64 `json:"claims_in_channel,omitempty"` + Error *string `json:"error,omitempty"` +} + +type ChannelNewResponse struct { + ClaimID string `json:"claim_id"` + Fee string `json:"fee"` + Nout int `json:"nout"` + Success bool `json:"success"` + Tx string `json:"tx"` + Txid string `json:"txid"` +} + +type ChannelListMineResponse []struct { + Address string `json:"address"` + Amount float64 `json:"amount"` + BlocksToExpiration int `json:"blocks_to_expiration"` + CanSign bool `json:"can_sign"` + Category string `json:"category"` + ClaimID string `json:"claim_id"` + Confirmations int `json:"confirmations"` + DecodedClaim bool `json:"decoded_claim"` + ExpirationHeight int `json:"expiration_height"` + Expired bool `json:"expired"` + HasSignature bool `json:"has_signature"` + Height int `json:"height"` + Hex string `json:"hex"` + IsPending bool `json:"is_pending"` + IsSpent bool `json:"is_spent"` + Name string `json:"name"` + Nout int `json:"nout"` + Txid string `json:"txid"` + Value *lbryschema.Claim `json:"value"` +} + +type WalletListResponse []string + +type PublishResponse struct { } diff --git a/main.go b/main.go index c147c26..db0406d 100644 --- a/main.go +++ b/main.go @@ -9,5 +9,8 @@ func main() { //franklin() - ytsync() + err := ytsync() + if err != nil { + panic(err) + } } diff --git a/ytsync.go b/ytsync.go index 0a2a942..78b4b57 100644 --- a/ytsync.go +++ b/ytsync.go @@ -1,5 +1,366 @@ package main -func ytsync() { +import ( + "bytes" + "encoding/json" + "flag" + "fmt" + "io/ioutil" + "net/http" + "os" + "regexp" + "strings" + "sync" + "time" + "github.com/lbryio/lbry.go/jsonrpc" + + ytdl "github.com/kkdai/youtube" + log "github.com/sirupsen/logrus" + "google.golang.org/api/googleapi/transport" + "google.golang.org/api/youtube/v3" +) + +const ( + concurrentVideos = 1 +) + +type video struct { + id string + channelID string + channelTitle string + title string + description string +} + +var ( + daemon *jsonrpc.Client + channelID string + lbryChannelName string + claimChannel bool + claimAddress string + videoDirectory string + ytAPIKey string +) + +func ytsync() error { + var err error + + flag.StringVar(&ytAPIKey, "ytApiKey", "", "Youtube API key (required)") + flag.StringVar(&channelID, "channelID", "", "ID of the youtube channel to sync (required)") + flag.StringVar(&lbryChannelName, "lbryChannel", "", "Publish videos into this channel") + flag.BoolVar(&claimChannel, "claimChannel", false, "Claim channel if we do not own it") + flag.Parse() + + if channelID == "" || ytAPIKey == "" { + flag.Usage() + return nil + } + + var wg sync.WaitGroup + videoQueue := make(chan video) + + daemon = jsonrpc.NewClient("") + videoDirectory, err = ioutil.TempDir("", "ytsync") + if err != nil { + return err + } + + if lbryChannelName != "" { + err = ensureChannelOwnership() + if err != nil { + return err + } + } + + addresses, err := daemon.WalletList() + if err != nil { + return err + } else if addresses == nil || len(*addresses) == 0 { + return fmt.Errorf("Could not find an address in wallet") + } + claimAddress = (*addresses)[0] + if claimAddress == "" { + return fmt.Errorf("Found blank claim address") + } + + for i := 0; i < concurrentVideos; i++ { + go func() { + wg.Add(1) + defer wg.Done() + + for { + v, more := <-videoQueue + if !more { + return + } + err := processVideo(v) + if err != nil { + log.Errorln("error processing video: " + err.Error()) + } + } + }() + } + + err = enqueueVideosFromChannel(channelID, &videoQueue) + if err != nil { + return err + } + close(videoQueue) + + wg.Wait() + return nil +} + +func ensureChannelOwnership() error { + channels, err := daemon.ChannelListMine() + if err != nil { + return err + } else if channels == nil { + return fmt.Errorf("No channels") + } + + for _, channel := range *channels { + if channel.Name == lbryChannelName { + return nil + } + } + + resolveResp, err := daemon.Resolve(lbryChannelName) + if err != nil { + return err + } + + channelNotFound := (*resolveResp)[lbryChannelName].Error == nil || strings.Contains(*((*resolveResp)[lbryChannelName].Error), "cannot be resolved") + + if !channelNotFound { + return fmt.Errorf("Channel exists and we don't own it. Pick another channel.") + } + + if !claimChannel { + return fmt.Errorf("Channel does not exist. Create it with -claimChannel") + } + + _, err = daemon.ChannelNew(lbryChannelName, 0.01) + if err != nil { + return err + } + + // niko's code says "unfortunately the queues in the daemon are not yet merged so we must give it some time for the channel to go through" + wait := 15 * time.Second + log.Println("Waiting " + wait.String() + " for channel claim to go through") + time.Sleep(wait) + + return nil +} + +func enqueueVideosFromChannel(channelID string, videoChan *chan video) error { + client := &http.Client{ + Transport: &transport.APIKey{Key: ytAPIKey}, + } + + service, err := youtube.New(client) + if err != nil { + return fmt.Errorf("Error creating YouTube service: %v", err) + } + + response, err := service.Channels.List("contentDetails").Id(channelID).Do() + if err != nil { + return fmt.Errorf("Error getting channels: %v", err) + } + + if len(response.Items) < 1 { + return fmt.Errorf("Youtube channel not found") + } + + if response.Items[0].ContentDetails.RelatedPlaylists == nil { + return fmt.Errorf("No related playlists") + } + + playlistID := response.Items[0].ContentDetails.RelatedPlaylists.Uploads + if playlistID == "" { + return fmt.Errorf("No channel playlist") + } + + firstRequest := true + nextPageToken := "" + + for firstRequest || nextPageToken != "" { + req := service.PlaylistItems.List("snippet").PlaylistId(playlistID).MaxResults(50) + if nextPageToken != "" { + req.PageToken(nextPageToken) + } + + playlistResponse, err := req.Do() + if err != nil { + return fmt.Errorf("Error getting playlist items: %v", err) + } + + if len(playlistResponse.Items) < 1 { + return fmt.Errorf("Playlist items not found") + } + + for _, item := range playlistResponse.Items { + // todo: there's thumbnail info here. why did we need lambda??? + *videoChan <- video{ + id: item.Snippet.ResourceId.VideoId, + channelID: channelID, + title: item.Snippet.Title, + description: item.Snippet.Description, + channelTitle: item.Snippet.ChannelTitle, + } + } + + nextPageToken = playlistResponse.NextPageToken + firstRequest = false + } + + return nil +} + +func processVideo(v video) error { + log.Println("Processing " + v.id) + + //download and thumbnail can be done in parallel + err := downloadVideo(v.id) + if err != nil { + return fmt.Errorf("download error: %s", err.Error()) + } + + err = triggerThumbnailSave(v.id) + if err != nil { + return fmt.Errorf("thumbnail error: %s", err.Error()) + } + + err = publish(v) + if err != nil { + return fmt.Errorf("publish error: %s", err.Error()) + } + + return nil +} + +func downloadVideo(videoID string) error { + verbose := false + videoPath := videoDirectory + "/" + videoID + ".mp4" + + _, err := os.Stat(videoPath) + if err != nil && !os.IsNotExist(err) { + return err + } else if err == nil { + log.Println(videoID + " already exists at " + videoPath) + return nil + } + + downloader := ytdl.NewYoutube(verbose) + err = downloader.DecodeURL("https://www.youtube.com/watch?v=" + videoID) + if err != nil { + return err + } + err = downloader.StartDownload(videoPath) + if err != nil { + return err + } + log.Debugln("Downloaded " + videoID) + return nil +} + +func triggerThumbnailSave(videoID string) error { + client := &http.Client{Timeout: 30 * time.Second} + + params, err := json.Marshal(map[string]string{"videoid": videoID}) + if err != nil { + return err + } + + request, err := http.NewRequest(http.MethodPut, "https://jgp4g1qoud.execute-api.us-east-1.amazonaws.com/prod/thumbnail", bytes.NewBuffer(params)) + if err != nil { + return err + } + + response, err := client.Do(request) + if err != nil { + return err + } + defer response.Body.Close() + + contents, err := ioutil.ReadAll(response.Body) + if err != nil { + return err + } + + var decoded struct { + error int `json:"error"` + url string `json:"url,omitempty"` + message string `json:"message,omitempty"` + } + err = json.Unmarshal(contents, &decoded) + if err != nil { + return err + } + + if decoded.error != 0 { + return fmt.Errorf("error creating thumbnail: " + decoded.message) + } + + log.Debugln("Created thumbnail for " + videoID) + + return nil +} + +func strPtr(s string) *string { return &s } + +func titleToClaimName(name string) string { + maxLen := 40 + reg := regexp.MustCompile(`[^a-zA-Z0-9]+`) + + chunks := strings.Split(strings.ToLower(strings.Trim(reg.ReplaceAllString(name, "-"), "-")), "-") + + name = chunks[0] + if len(name) > maxLen { + return name[:maxLen] + } + + for _, chunk := range chunks[1:] { + tmpName := name + "-" + chunk + if len(tmpName) > maxLen { + if len(name) < 20 { + name = tmpName[:maxLen] + } + break + } + name = tmpName + } + + return name +} + +func publish(v video) error { + maxDescLines := 10 + descriptionLines := strings.Split(v.description, "\n") + var description string + if len(descriptionLines) > maxDescLines { + description = strings.Join(descriptionLines[:maxDescLines], "\n") + "\n..." + } else { + description = strings.Join(descriptionLines, "\n") + } + + options := jsonrpc.PublishOptions{ + Title: &v.title, + Author: &v.channelTitle, + Description: &description, + Language: strPtr("en"), + ClaimAddress: &claimAddress, + Thumbnail: strPtr("http://berk.ninja/thumbnails/" + v.id), + License: strPtr("Copyrighted (Contact Author)"), + } + if lbryChannelName != "" { + options.ChannelName = &lbryChannelName + } + _, err := daemon.Publish(titleToClaimName(v.title), videoDirectory+"/"+v.id+".mp4", 0.01, options) + if err != nil { + return err + } + + return nil }