Ytsync refactor #45

Merged
nikooo777 merged 5 commits from ytsync-refactor into master 2018-10-08 21:44:59 +02:00
2 changed files with 182 additions and 121 deletions
Showing only changes of commit ba61e2c7e9 - Show all commits

View file

@ -1,12 +1,7 @@
package ytsync package ytsync
import ( import (
"encoding/json"
"fmt" "fmt"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"strings" "strings"
"syscall" "syscall"
"time" "time"
@ -14,6 +9,7 @@ import (
"github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/null" "github.com/lbryio/lbry.go/null"
"github.com/lbryio/lbry.go/util" "github.com/lbryio/lbry.go/util"
"github.com/lbryio/lbry.go/ytsync/sdk"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -25,16 +21,8 @@ type SyncManager struct {
Limit int Limit int
SkipSpaceCheck bool SkipSpaceCheck bool
SyncUpdate bool SyncUpdate bool
SyncStatus string
SyncFrom int64
SyncUntil int64
ConcurrentJobs int ConcurrentJobs int
ConcurrentVideos int ConcurrentVideos int
HostName string
YoutubeChannelID string
YoutubeAPIKey string
ApiURL string
ApiToken string
BlobsDir string BlobsDir string
VideosLimit int VideosLimit int
MaxVideoSize int MaxVideoSize int
@ -42,8 +30,11 @@ type SyncManager struct {
AwsS3ID string AwsS3ID string
AwsS3Secret string AwsS3Secret string
AwsS3Region string AwsS3Region string
SyncStatus string
AwsS3Bucket string AwsS3Bucket string
SingleRun bool SingleRun bool
ChannelProperties *sdk.ChannelProperties
APIConfig *sdk.APIConfig
} }
const ( const (
@ -68,31 +59,11 @@ type apiYoutubeChannel struct {
TotalVideos uint `json:"total_videos"` TotalVideos uint `json:"total_videos"`
DesiredChannelName string `json:"desired_channel_name"` DesiredChannelName string `json:"desired_channel_name"`
SyncServer null.String `json:"sync_server"` SyncServer null.String `json:"sync_server"`
} Fee *struct {
Amount string `json:"amount"`
func (s *SyncManager) fetchChannels(status string) ([]apiYoutubeChannel, error) { Address string `json:"address"`
endpoint := s.ApiURL + "/yt/jobs" Currency string `json:"currency"`
res, _ := http.PostForm(endpoint, url.Values{ } `json:"fee"`
"auth_token": {s.ApiToken},
"sync_status": {status},
"min_videos": {strconv.Itoa(1)},
"after": {strconv.Itoa(int(s.SyncFrom))},
"before": {strconv.Itoa(int(s.SyncUntil))},
"sync_server": {s.HostName},
"channel_id": {s.YoutubeChannelID},
})
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
var response apiJobsResponse
err := json.Unmarshal(body, &response)
if err != nil {
return nil, err
}
if response.Data == nil {
return nil, errors.Err(response.Error)
}
log.Printf("Fetched channels: %d", len(response.Data))
return response.Data, nil
} }
type apiChannelStatusResponse struct { type apiChannelStatusResponse struct {
@ -108,91 +79,11 @@ type syncedVideo struct {
ClaimName string `json:"claim_name"` ClaimName string `json:"claim_name"`
} }
func (s *SyncManager) setChannelStatus(channelID string, status string, failureReason string) (map[string]syncedVideo, map[string]bool, error) {
endpoint := s.ApiURL + "/yt/channel_status"
if len(failureReason) > maxReasonLength {
failureReason = failureReason[:maxReasonLength]
}
res, _ := http.PostForm(endpoint, url.Values{
"channel_id": {channelID},
"sync_server": {s.HostName},
"auth_token": {s.ApiToken},
"sync_status": {status},
"failure_reason": {failureReason},
})
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
var response apiChannelStatusResponse
err := json.Unmarshal(body, &response)
if err != nil {
return nil, nil, err
}
if !response.Error.IsNull() {
return nil, nil, errors.Err(response.Error.String)
}
if response.Data != nil {
svs := make(map[string]syncedVideo)
claimNames := make(map[string]bool)
for _, v := range response.Data {
svs[v.VideoID] = v
claimNames[v.ClaimName] = v.Published
}
return svs, claimNames, nil
}
return nil, nil, errors.Err("invalid API response. Status code: %d", res.StatusCode)
}
const ( const (
VideoStatusPublished = "published" VideoStatusPublished = "published"
VideoStatusFailed = "failed" VideoStatusFailed = "failed"
) )
func (s *SyncManager) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string, size *int64) error {
endpoint := s.ApiURL + "/yt/video_status"
if len(failureReason) > maxReasonLength {
failureReason = failureReason[:maxReasonLength]
}
vals := url.Values{
"youtube_channel_id": {channelID},
"video_id": {videoID},
"status": {status},
"auth_token": {s.ApiToken},
}
if status == VideoStatusPublished {
if claimID == "" || claimName == "" {
return errors.Err("claimID or claimName missing")
}
vals.Add("published_at", strconv.FormatInt(time.Now().Unix(), 10))
vals.Add("claim_id", claimID)
vals.Add("claim_name", claimName)
if size != nil {
vals.Add("size", strconv.FormatInt(*size, 10))
}
}
if failureReason != "" {
vals.Add("failure_reason", failureReason)
}
res, _ := http.PostForm(endpoint, vals)
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
var response struct {
Success bool `json:"success"`
Error null.String `json:"error"`
Data null.String `json:"data"`
}
err := json.Unmarshal(body, &response)
if err != nil {
return err
}
if !response.Error.IsNull() {
return errors.Err(response.Error.String)
}
if !response.Data.IsNull() && response.Data.String == "ok" {
return nil
}
return errors.Err("invalid API response. Status code: %d", res.StatusCode)
}
func (s *SyncManager) Start() error { func (s *SyncManager) Start() error {
syncCount := 0 syncCount := 0
for { for {
@ -204,9 +95,9 @@ func (s *SyncManager) Start() error {
var syncs []Sync var syncs []Sync
shouldInterruptLoop := false shouldInterruptLoop := false
isSingleChannelSync := s.YoutubeChannelID != "" isSingleChannelSync := s.ChannelProperties.YoutubeChannelID != ""
if isSingleChannelSync { if isSingleChannelSync {
channels, err := s.fetchChannels("") channels, err := s.APIConfig.FetchChannels("", s.ChannelProperties)
if err != nil { if err != nil {
return err return err
} }
@ -242,7 +133,7 @@ func (s *SyncManager) Start() error {
queuesToSync = append(queuesToSync, StatusSyncing, StatusQueued) queuesToSync = append(queuesToSync, StatusSyncing, StatusQueued)
} }
for _, q := range queuesToSync { for _, q := range queuesToSync {
channels, err := s.fetchChannels(q) channels, err := s.APIConfig.FetchChannels(q, s.ChannelProperties)
if err != nil { if err != nil {
return err return err
} }

170
ytsync/sdk/api.go Normal file
View file

@ -0,0 +1,170 @@
package sdk
import (
"encoding/json"
"io/ioutil"
"log"
"net/http"
"net/url"
"strconv"
"time"
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/null"
)
const (
MaxReasonLength = 500
)
type APIConfig struct {
YoutubeAPIKey string
ApiURL string
ApiToken string
HostName string
}
type ChannelProperties struct {
SyncFrom int64
SyncUntil int64
YoutubeChannelID string
}
type YoutubeChannel struct {
ChannelId string `json:"channel_id"`
TotalVideos uint `json:"total_videos"`
DesiredChannelName string `json:"desired_channel_name"`
SyncServer null.String `json:"sync_server"`
Fee *struct {
Amount string `json:"amount"`
Address string `json:"address"`
Currency string `json:"currency"`
} `json:"fee"`
}
func (a *APIConfig) FetchChannels(status string, cp *ChannelProperties) ([]YoutubeChannel, error) {
type apiJobsResponse struct {
Success bool `json:"success"`
Error null.String `json:"error"`
Data []YoutubeChannel `json:"data"`
}
endpoint := a.ApiURL + "/yt/jobs"
res, _ := http.PostForm(endpoint, url.Values{
"auth_token": {a.ApiToken},
"sync_status": {status},
"min_videos": {strconv.Itoa(1)},
"after": {strconv.Itoa(int(cp.SyncFrom))},
"before": {strconv.Itoa(int(cp.SyncUntil))},
"sync_server": {a.HostName},
"channel_id": {cp.YoutubeChannelID},
})
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
var response apiJobsResponse
err := json.Unmarshal(body, &response)
if err != nil {
return nil, err
}
if response.Data == nil {
return nil, errors.Err(response.Error)
}
log.Printf("Fetched channels: %d", len(response.Data))
return response.Data, nil
}
type SyncedVideo struct {
VideoID string `json:"video_id"`
Published bool `json:"published"`
FailureReason string `json:"failure_reason"`
ClaimName string `json:"claim_name"`
}
func (a *APIConfig) setChannelStatus(channelID string, status string, failureReason string) (map[string]SyncedVideo, map[string]bool, error) {
type apiChannelStatusResponse struct {
Success bool `json:"success"`
Error null.String `json:"error"`
Data []SyncedVideo `json:"data"`
}
endpoint := a.ApiURL + "/yt/channel_status"
if len(failureReason) > MaxReasonLength {
failureReason = failureReason[:MaxReasonLength]
}
res, _ := http.PostForm(endpoint, url.Values{
"channel_id": {channelID},
"sync_server": {a.HostName},
"auth_token": {a.ApiToken},
"sync_status": {status},
"failure_reason": {failureReason},
})
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
var response apiChannelStatusResponse
err := json.Unmarshal(body, &response)
if err != nil {
return nil, nil, err
}
if !response.Error.IsNull() {
return nil, nil, errors.Err(response.Error.String)
}
if response.Data != nil {
svs := make(map[string]SyncedVideo)
claimNames := make(map[string]bool)
for _, v := range response.Data {
svs[v.VideoID] = v
claimNames[v.ClaimName] = v.Published
}
return svs, claimNames, nil
}
return nil, nil, errors.Err("invalid API response. Status code: %d", res.StatusCode)
}
const (
VideoStatusPublished = "published"
VideoStatusFailed = "failed"
)
func (a *APIConfig) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string, size *int64) error {
endpoint := a.ApiURL + "/yt/video_status"
if len(failureReason) > MaxReasonLength {
failureReason = failureReason[:MaxReasonLength]
}
vals := url.Values{
"youtube_channel_id": {channelID},
"video_id": {videoID},
"status": {status},
"auth_token": {a.ApiToken},
}
if status == VideoStatusPublished {
if claimID == "" || claimName == "" {
return errors.Err("claimID or claimName missing")
}
vals.Add("published_at", strconv.FormatInt(time.Now().Unix(), 10))
vals.Add("claim_id", claimID)
vals.Add("claim_name", claimName)
if size != nil {
vals.Add("size", strconv.FormatInt(*size, 10))
}
}
if failureReason != "" {
vals.Add("failure_reason", failureReason)
}
res, _ := http.PostForm(endpoint, vals)
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
var response struct {
Success bool `json:"success"`
Error null.String `json:"error"`
Data null.String `json:"data"`
}
err := json.Unmarshal(body, &response)
if err != nil {
return err
}
if !response.Error.IsNull() {
return errors.Err(response.Error.String)
}
if !response.Data.IsNull() && response.Data.String == "ok" {
return nil
}
return errors.Err("invalid API response. Status code: %d", res.StatusCode)
}