move all youtube api calls into a single place

This commit is contained in:
Alex Grintsvayg 2020-07-27 14:48:05 -04:00
parent be7fd7ddd8
commit 843303301a
No known key found for this signature in database
GPG key ID: AEB3F089F86A22B5
6 changed files with 253 additions and 184 deletions

View file

@ -1,32 +1,9 @@
package manager
import (
"net/http"
"github.com/lbryio/lbry.go/v2/extras/errors"
"google.golang.org/api/googleapi/transport"
"google.golang.org/api/youtube/v3"
"github.com/lbryio/ytsync/v5/ytapi"
)
func (s *Sync) CountVideos() (uint64, error) {
client := &http.Client{
Transport: &transport.APIKey{Key: s.APIConfig.YoutubeAPIKey},
}
service, err := youtube.New(client)
if err != nil {
return 0, errors.Prefix("error creating YouTube service", err)
}
response, err := service.Channels.List("statistics").Id(s.YoutubeChannelID).Do()
if err != nil {
return 0, errors.Prefix("error getting channels", err)
}
if len(response.Items) < 1 {
return 0, errors.Err("youtube channel not found")
}
return response.Items[0].Statistics.VideoCount, nil
return ytapi.VideosInChannel(s.APIConfig.YoutubeAPIKey, s.YoutubeChannelID)
}

View file

@ -3,7 +3,6 @@ package manager
import (
"fmt"
"math"
"net/http"
"strconv"
"time"
@ -12,14 +11,13 @@ import (
"github.com/lbryio/lbry.go/v2/extras/util"
"github.com/lbryio/ytsync/v5/timing"
logUtils "github.com/lbryio/ytsync/v5/util"
"github.com/lbryio/ytsync/v5/ytapi"
"github.com/lbryio/ytsync/v5/tags_manager"
"github.com/lbryio/ytsync/v5/thumbs"
"github.com/shopspring/decimal"
log "github.com/sirupsen/logrus"
"google.golang.org/api/googleapi/transport"
"google.golang.org/api/youtube/v3"
)
func (s *Sync) enableAddressReuse() error {
@ -266,15 +264,14 @@ func (s *Sync) ensureEnoughUTXOs() error {
}
func (s *Sync) waitForNewBlock() error {
start := time.Now()
defer func(start time.Time) {
timing.TimedComponent("waitForNewBlock").Add(time.Since(start))
}(start)
defer func(start time.Time) { timing.TimedComponent("waitForNewBlock").Add(time.Since(start)) }(time.Now())
log.Printf("regtest: %t, docker: %t", logUtils.IsRegTest(), logUtils.IsUsingDocker())
status, err := s.daemon.Status()
if err != nil {
return err
}
for status.Wallet.Blocks == 0 || status.Wallet.BlocksBehind != 0 {
time.Sleep(5 * time.Second)
status, err = s.daemon.Status()
@ -308,10 +305,12 @@ func (s *Sync) GenerateRegtestBlock() error {
if err != nil {
return errors.Prefix("error getting lbrycrd client: ", err)
}
txs, err := lbrycrd.Generate(1)
if err != nil {
return errors.Prefix("error generating new block: ", err)
}
for _, tx := range txs {
log.Info("Generated tx: ", tx.String())
}
@ -319,10 +318,8 @@ func (s *Sync) GenerateRegtestBlock() error {
}
func (s *Sync) ensureChannelOwnership() error {
start := time.Now()
defer func(start time.Time) {
timing.TimedComponent("ensureChannelOwnership").Add(time.Since(start))
}(start)
defer func(start time.Time) { timing.TimedComponent("ensureChannelOwnership").Add(time.Since(start)) }(time.Now())
if s.LbryChannelName == "" {
return errors.Err("no channel name set")
}
@ -386,27 +383,12 @@ func (s *Sync) ensureChannelOwnership() error {
return err
}
}
client := &http.Client{
Transport: &transport.APIKey{Key: s.APIConfig.YoutubeAPIKey},
}
service, err := youtube.New(client)
channelInfo, channelBranding, err := ytapi.ChannelInfo(s.APIConfig.YoutubeAPIKey, s.YoutubeChannelID)
if err != nil {
return errors.Prefix("error creating YouTube service", err)
return err
}
response, err := service.Channels.List("snippet,brandingSettings").Id(s.YoutubeChannelID).Do()
if err != nil {
return errors.Prefix("error getting channel details", err)
}
if len(response.Items) < 1 {
return errors.Err("youtube channel not found")
}
channelInfo := response.Items[0].Snippet
channelBranding := response.Items[0].BrandingSettings
thumbnail := thumbs.GetBestThumbnail(channelInfo.Thumbnails)
thumbnailURL, err := thumbs.MirrorThumbnail(thumbnail.Url, s.YoutubeChannelID, s.Manager.GetS3AWSConfig())
if err != nil {
@ -467,6 +449,7 @@ func (s *Sync) ensureChannelOwnership() error {
if err != nil {
return err
}
s.lbryChannelID = c.Outputs[0].ClaimID
return s.Manager.apiConfig.SetChannelClaimID(s.YoutubeChannelID, s.lbryChannelID)
}

View file

@ -3,11 +3,9 @@ package manager
import (
"fmt"
"io/ioutil"
"net/http"
"os"
"os/signal"
"runtime/debug"
"sort"
"strconv"
"strings"
"sync"
@ -21,6 +19,7 @@ import (
"github.com/lbryio/ytsync/v5/thumbs"
"github.com/lbryio/ytsync/v5/timing"
logUtils "github.com/lbryio/ytsync/v5/util"
"github.com/lbryio/ytsync/v5/ytapi"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/jsonrpc"
@ -34,8 +33,6 @@ import (
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
log "github.com/sirupsen/logrus"
"google.golang.org/api/googleapi/transport"
"google.golang.org/api/youtube/v3"
)
const (
@ -47,22 +44,6 @@ const (
maxReasonLength = 500
)
type video interface {
Size() *int64
ID() string
IDAndNum() string
PlaylistPosition() int
PublishedAt() time.Time
Sync(*jsonrpc.Client, sources.SyncParams, *sdk.SyncedVideo, bool, *sync.RWMutex) (*sources.SyncSummary, error)
}
// sorting videos
type byPublishedAt []video
func (a byPublishedAt) Len() int { return len(a) }
func (a byPublishedAt) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byPublishedAt) Less(i, j int) bool { return a[i].PublishedAt().Before(a[j].PublishedAt()) }
// Sync stores the options that control how syncing happens
type Sync struct {
APIConfig *sdk.APIConfig
@ -87,7 +68,7 @@ type Sync struct {
lbryChannelID string
namer *namer.Namer
walletMux *sync.RWMutex
queue chan video
queue chan ytapi.Video
transferState int
clientPublishAddress string
publicKey string
@ -263,7 +244,7 @@ func (s *Sync) FullCycle() (e error) {
s.syncedVideosMux = &sync.RWMutex{}
s.walletMux = &sync.RWMutex{}
s.grp = stopGroup
s.queue = make(chan video)
s.queue = make(chan ytapi.Video)
interruptChan := make(chan os.Signal, 1)
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
defer signal.Stop(interruptChan)
@ -847,7 +828,7 @@ func (s *Sync) doSync() error {
}
func (s *Sync) startWorker(workerNum int) {
var v video
var v ytapi.Video
var more bool
for {
@ -996,108 +977,24 @@ func (s *Sync) startWorker(workerNum int) {
}
}
var mostRecentlyFailedChannel string
func (s *Sync) enqueueYoutubeVideos() error {
start := time.Now()
defer func(start time.Time) {
timing.TimedComponent("enqueueYoutubeVideos").Add(time.Since(start))
}(start)
client := &http.Client{
Transport: &transport.APIKey{Key: s.APIConfig.YoutubeAPIKey},
}
defer func(start time.Time) { timing.TimedComponent("enqueueYoutubeVideos").Add(time.Since(start)) }(time.Now())
service, err := youtube.New(client)
if err != nil {
return errors.Prefix("error creating YouTube service", err)
}
response, err := service.Channels.List("contentDetails").Id(s.YoutubeChannelID).Do()
if err != nil {
return errors.Prefix("error getting channels", err)
}
if len(response.Items) < 1 {
return errors.Err("youtube channel not found")
}
if response.Items[0].ContentDetails.RelatedPlaylists == nil {
return errors.Err("no related playlists")
}
playlistID := response.Items[0].ContentDetails.RelatedPlaylists.Uploads
if playlistID == "" {
return errors.Err("no channel playlist")
}
var videos []video
ipPool, err := ip_manager.GetIPPool(s.grp)
if err != nil {
return err
}
playlistMap := make(map[string]*youtube.PlaylistItemSnippet, 50)
nextPageToken := ""
for {
req := service.PlaylistItems.List("snippet").
PlaylistId(playlistID).
MaxResults(50).
PageToken(nextPageToken)
playlistResponse, err := req.Do()
videos, err := ytapi.Enqueue(s.APIConfig.YoutubeAPIKey, s.YoutubeChannelID, s.syncedVideos, s.Manager.SyncFlags.QuickSync, s.Manager.videosLimit, ytapi.VideoParams{
VideoDir: s.videoDirectory,
S3Config: s.Manager.GetS3AWSConfig(),
Grp: s.grp,
IPPool: ipPool,
})
if err != nil {
return errors.Prefix("error getting playlist items", err)
return err
}
if len(playlistResponse.Items) < 1 {
// If there are 50+ videos in a playlist but less than 50 are actually returned by the API, youtube will still redirect
// clients to a next page. Such next page will however be empty. This logic prevents ytsync from failing.
youtubeIsLying := len(videos) > 0
if youtubeIsLying {
break
}
if s.YoutubeChannelID == mostRecentlyFailedChannel {
return errors.Err("playlist items not found")
}
mostRecentlyFailedChannel = s.YoutubeChannelID
break //return errors.Err("playlist items not found") //TODO: will this work?
}
videoIDs := make([]string, 50)
for i, item := range playlistResponse.Items {
// normally we'd send the video into the channel here, but youtube api doesn't have sorting
// so we have to get ALL the videos, then sort them, then send them in
playlistMap[item.Snippet.ResourceId.VideoId] = item.Snippet
videoIDs[i] = item.Snippet.ResourceId.VideoId
}
req2 := service.Videos.List("snippet,contentDetails,recordingDetails").Id(strings.Join(videoIDs[:], ","))
videosListResponse, err := req2.Do()
if err != nil {
return errors.Prefix("error getting videos info", err)
}
for _, item := range videosListResponse.Items {
videos = append(videos, sources.NewYoutubeVideo(s.videoDirectory, item, playlistMap[item.Id].Position, s.Manager.GetS3AWSConfig(), s.grp, ipPool))
}
log.Infof("Got info for %d videos from youtube API", len(videos))
nextPageToken = playlistResponse.NextPageToken
if nextPageToken == "" || s.Manager.SyncFlags.QuickSync || len(videos) >= s.Manager.videosLimit {
break
}
}
for k, v := range s.syncedVideos {
if !v.Published {
continue
}
_, ok := playlistMap[k]
if !ok {
videos = append(videos, sources.NewMockedVideo(s.videoDirectory, k, s.YoutubeChannelID, s.Manager.GetS3AWSConfig(), s.grp, ipPool))
}
}
sort.Sort(byPublishedAt(videos))
Enqueue:
for _, v := range videos {
select {
@ -1116,7 +1013,7 @@ Enqueue:
return nil
}
func (s *Sync) processVideo(v video) (err error) {
func (s *Sync) processVideo(v ytapi.Video) (err error) {
defer func() {
if p := recover(); p != nil {
logUtils.SendErrorToSlack("Video processing panic! %s", debug.Stack())
@ -1284,8 +1181,7 @@ func (s *Sync) getUnsentSupports() (float64, error) {
// waitForDaemonProcess observes the running processes and returns when the process is no longer running or when the timeout is up
func waitForDaemonProcess(timeout time.Duration) error {
then := time.Now()
stopTime := then.Add(time.Duration(timeout * time.Second))
stopTime := time.Now().Add(timeout * time.Second)
for !time.Now().After(stopTime) {
wait := 10 * time.Second
log.Println("the daemon is still running, waiting for it to exit")

View file

@ -13,24 +13,24 @@ import (
"sync"
"time"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/jsonrpc"
"github.com/lbryio/lbry.go/v2/extras/stop"
"github.com/lbryio/lbry.go/v2/extras/util"
"github.com/lbryio/ytsync/v5/timing"
logUtils "github.com/lbryio/ytsync/v5/util"
"github.com/lbryio/ytsync/v5/ip_manager"
"github.com/lbryio/ytsync/v5/namer"
"github.com/lbryio/ytsync/v5/sdk"
"github.com/lbryio/ytsync/v5/tags_manager"
"github.com/lbryio/ytsync/v5/thumbs"
"github.com/lbryio/ytsync/v5/timing"
logUtils "github.com/lbryio/ytsync/v5/util"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/jsonrpc"
"github.com/lbryio/lbry.go/v2/extras/stop"
"github.com/lbryio/lbry.go/v2/extras/util"
duration "github.com/ChannelMeter/iso8601duration"
"github.com/aws/aws-sdk-go/aws"
"github.com/shopspring/decimal"
log "github.com/sirupsen/logrus"
"google.golang.org/api/youtube/v3"
ytlib "google.golang.org/api/youtube/v3"
)
type YoutubeVideo struct {
@ -43,7 +43,7 @@ type YoutubeVideo struct {
maxVideoLength float64
publishedAt time.Time
dir string
youtubeInfo *youtube.Video
youtubeInfo *ytlib.Video
youtubeChannelID string
tags []string
awsConfig aws.Config
@ -90,7 +90,7 @@ var youtubeCategories = map[string]string{
"44": "trailers",
}
func NewYoutubeVideo(directory string, videoData *youtube.Video, playlistPosition int64, awsConfig aws.Config, stopGroup *stop.Group, pool *ip_manager.IPPool) *YoutubeVideo {
func NewYoutubeVideo(directory string, videoData *ytlib.Video, playlistPosition int64, awsConfig aws.Config, stopGroup *stop.Group, pool *ip_manager.IPPool) *YoutubeVideo {
publishedAt, _ := time.Parse(time.RFC3339Nano, videoData.Snippet.PublishedAt) // ignore parse errors
return &YoutubeVideo{
id: videoData.Id,

View file

@ -11,7 +11,7 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
log "github.com/sirupsen/logrus"
"google.golang.org/api/youtube/v3"
ytlib "google.golang.org/api/youtube/v3"
)
type thumbnailUploader struct {
@ -98,7 +98,7 @@ func MirrorThumbnail(url string, name string, s3Config aws.Config) (string, erro
return tu.mirroredUrl, nil
}
func GetBestThumbnail(thumbnails *youtube.ThumbnailDetails) *youtube.Thumbnail {
func GetBestThumbnail(thumbnails *ytlib.ThumbnailDetails) *ytlib.Thumbnail {
if thumbnails.Maxres != nil {
return thumbnails.Maxres
} else if thumbnails.High != nil {

213
ytapi/ytapi.go Normal file
View file

@ -0,0 +1,213 @@
package ytapi
import (
"net/http"
"sort"
"strings"
"sync"
"time"
"github.com/lbryio/ytsync/v5/ip_manager"
"github.com/lbryio/ytsync/v5/sdk"
"github.com/lbryio/ytsync/v5/sources"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/jsonrpc"
"github.com/lbryio/lbry.go/v2/extras/stop"
"github.com/aws/aws-sdk-go/aws"
log "github.com/sirupsen/logrus"
"google.golang.org/api/googleapi/transport"
ytlib "google.golang.org/api/youtube/v3"
)
type Video interface {
Size() *int64
ID() string
IDAndNum() string
PlaylistPosition() int
PublishedAt() time.Time
Sync(*jsonrpc.Client, sources.SyncParams, *sdk.SyncedVideo, bool, *sync.RWMutex) (*sources.SyncSummary, error)
}
type byPublishedAt []Video
func (a byPublishedAt) Len() int { return len(a) }
func (a byPublishedAt) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byPublishedAt) Less(i, j int) bool { return a[i].PublishedAt().Before(a[j].PublishedAt()) }
type VideoParams struct {
VideoDir string
S3Config aws.Config
Grp *stop.Group
IPPool *ip_manager.IPPool
}
var mostRecentlyFailedChannel string // TODO: fix this hack!
func Enqueue(apiKey, channelID string, syncedVideos map[string]sdk.SyncedVideo, quickSync bool, maxVideos int, videoParams VideoParams) ([]Video, error) {
playlistID, err := PlaylistID(apiKey, channelID)
if err != nil {
return nil, err
}
playlistMap := make(map[string]*ytlib.PlaylistItemSnippet, 50)
var playlistItems []*ytlib.PlaylistItem
var nextPageToken string
var videos []Video
for {
playlistItems, nextPageToken, err = PlaylistItems(apiKey, playlistID, nextPageToken)
if len(playlistItems) < 1 {
// If there are 50+ videos in a playlist but less than 50 are actually returned by the API, youtube will still redirect
// clients to a next page. Such next page will however be empty. This logic prevents ytsync from failing.
youtubeIsLying := len(videos) > 0
if youtubeIsLying {
break
}
if channelID == mostRecentlyFailedChannel {
return nil, errors.Err("playlist items not found")
}
mostRecentlyFailedChannel = channelID
break //return errors.Err("playlist items not found") //TODO: will this work?
}
videoIDs := make([]string, len(playlistItems))
for i, item := range playlistItems {
// normally we'd send the video into the channel here, but youtube api doesn't have sorting
// so we have to get ALL the videos, then sort them, then send them in
playlistMap[item.Snippet.ResourceId.VideoId] = item.Snippet
videoIDs[i] = item.Snippet.ResourceId.VideoId
}
vids, err := Videos(apiKey, videoIDs)
if err != nil {
return nil, err
}
for _, item := range vids {
videos = append(videos, sources.NewYoutubeVideo(videoParams.VideoDir, item, playlistMap[item.Id].Position, videoParams.S3Config, videoParams.Grp, videoParams.IPPool))
}
log.Infof("Got info for %d videos from youtube API", len(videos))
if nextPageToken == "" || quickSync || len(videos) >= maxVideos {
break
}
}
for k, v := range syncedVideos {
if !v.Published {
continue
}
if _, ok := playlistMap[k]; !ok {
videos = append(videos, sources.NewMockedVideo(videoParams.VideoDir, k, channelID, videoParams.S3Config, videoParams.Grp, videoParams.IPPool))
}
}
sort.Sort(byPublishedAt(videos))
return videos, nil
}
func VideosInChannel(apiKey, channelID string) (uint64, error) {
client := &http.Client{
Transport: &transport.APIKey{Key: apiKey},
}
service, err := ytlib.New(client)
if err != nil {
return 0, errors.Prefix("error creating YouTube service", err)
}
response, err := service.Channels.List("statistics").Id(channelID).Do()
if err != nil {
return 0, errors.Prefix("error getting channels", err)
}
if len(response.Items) < 1 {
return 0, errors.Err("youtube channel not found")
}
return response.Items[0].Statistics.VideoCount, nil
}
func ChannelInfo(apiKey, channelID string) (*ytlib.ChannelSnippet, *ytlib.ChannelBrandingSettings, error) {
service, err := ytlib.New(&http.Client{Transport: &transport.APIKey{Key: apiKey}})
if err != nil {
return nil, nil, errors.Prefix("error creating YouTube service", err)
}
response, err := service.Channels.List("snippet,brandingSettings").Id(channelID).Do()
if err != nil {
return nil, nil, errors.Prefix("error getting channel details", err)
}
if len(response.Items) < 1 {
return nil, nil, errors.Err("youtube channel not found")
}
return response.Items[0].Snippet, response.Items[0].BrandingSettings, nil
}
func PlaylistID(apiKey, channelID string) (string, error) {
service, err := ytlib.New(&http.Client{Transport: &transport.APIKey{Key: apiKey}})
if err != nil {
return "", errors.Prefix("error creating YouTube service", err)
}
response, err := service.Channels.List("contentDetails").Id(channelID).Do()
if err != nil {
return "", errors.Prefix("error getting channel details", err)
}
if len(response.Items) < 1 {
return "", errors.Err("youtube channel not found")
}
if response.Items[0].ContentDetails.RelatedPlaylists == nil {
return "", errors.Err("no related playlists")
}
playlistID := response.Items[0].ContentDetails.RelatedPlaylists.Uploads
if playlistID == "" {
return "", errors.Err("no channel playlist")
}
return playlistID, nil
}
func PlaylistItems(apiKey, playlistID, nextPageToken string) ([]*ytlib.PlaylistItem, string, error) {
service, err := ytlib.New(&http.Client{Transport: &transport.APIKey{Key: apiKey}})
if err != nil {
return nil, "", errors.Prefix("error creating YouTube service", err)
}
response, err := service.PlaylistItems.List("snippet").
PlaylistId(playlistID).
MaxResults(50).
PageToken(nextPageToken).
Do()
if err != nil {
return nil, "", errors.Prefix("error getting playlist items", err)
}
return response.Items, response.NextPageToken, nil
}
func Videos(apiKey string, videoIDs []string) ([]*ytlib.Video, error) {
service, err := ytlib.New(&http.Client{Transport: &transport.APIKey{Key: apiKey}})
if err != nil {
return nil, errors.Prefix("error creating YouTube service", err)
}
response, err := service.Videos.List("snippet,contentDetails,recordingDetails").Id(strings.Join(videoIDs[:], ",")).Do()
if err != nil {
return nil, errors.Prefix("error getting videos info", err)
}
return response.Items, nil
}