lbry.go/ytsync/manager.go

335 lines
10 KiB
Go
Raw Normal View History

package ytsync
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"strings"
"syscall"
"time"
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/null"
"github.com/lbryio/lbry.go/util"
log "github.com/sirupsen/logrus"
)
type SyncManager struct {
StopOnError bool
MaxTries int
TakeOverExistingChannel bool
Refill int
Limit int
SkipSpaceCheck bool
SyncUpdate bool
SyncStatus string
SyncFrom int64
SyncUntil int64
ConcurrentJobs int
ConcurrentVideos int
HostName string
YoutubeChannelID string
2018-07-24 02:01:35 +02:00
YoutubeAPIKey string
ApiURL string
ApiToken string
BlobsDir string
2018-08-01 14:56:04 +02:00
VideosLimit int
MaxVideoSize int
LbrycrdString string
AwsS3ID string
AwsS3Secret string
AwsS3Region string
AwsS3Bucket string
SingleRun bool
}
const (
2018-08-03 13:02:44 +02:00
StatusPending = "pending" // waiting for permission to sync
StatusQueued = "queued" // in sync queue. will be synced soon
StatusSyncing = "syncing" // syncing now
StatusSynced = "synced" // done
StatusFailed = "failed"
StatusFinalized = "finalized" // no more changes allowed
)
2018-08-03 13:02:44 +02:00
var SyncStatuses = []string{StatusPending, StatusQueued, StatusSyncing, StatusSynced, StatusFailed, StatusFinalized}
type apiJobsResponse struct {
Success bool `json:"success"`
Error null.String `json:"error"`
Data []apiYoutubeChannel `json:"data"`
}
type apiYoutubeChannel struct {
ChannelId string `json:"channel_id"`
TotalVideos uint `json:"total_videos"`
DesiredChannelName string `json:"desired_channel_name"`
SyncServer null.String `json:"sync_server"`
}
func (s *SyncManager) fetchChannels(status string) ([]apiYoutubeChannel, error) {
2018-07-24 02:01:35 +02:00
endpoint := s.ApiURL + "/yt/jobs"
res, _ := http.PostForm(endpoint, url.Values{
2018-07-24 02:01:35 +02:00
"auth_token": {s.ApiToken},
"sync_status": {status},
"min_videos": {strconv.Itoa(1)},
"after": {strconv.Itoa(int(s.SyncFrom))},
"before": {strconv.Itoa(int(s.SyncUntil))},
"sync_server": {s.HostName},
"channel_id": {s.YoutubeChannelID},
})
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
var response apiJobsResponse
err := json.Unmarshal(body, &response)
if err != nil {
return nil, err
}
if response.Data == nil {
return nil, errors.Err(response.Error)
}
log.Printf("Fetched channels: %d", len(response.Data))
return response.Data, nil
}
type apiChannelStatusResponse struct {
Success bool `json:"success"`
Error null.String `json:"error"`
Data []syncedVideo `json:"data"`
}
type syncedVideo struct {
VideoID string `json:"video_id"`
Published bool `json:"published"`
FailureReason string `json:"failure_reason"`
ClaimName string `json:"claim_name"`
}
func (s *SyncManager) setChannelStatus(channelID string, status string, failureReason string) (map[string]syncedVideo, map[string]bool, error) {
endpoint := s.ApiURL + "/yt/channel_status"
if len(failureReason) > maxReasonLength {
failureReason = failureReason[:maxReasonLength]
}
res, _ := http.PostForm(endpoint, url.Values{
"channel_id": {channelID},
"sync_server": {s.HostName},
"auth_token": {s.ApiToken},
"sync_status": {status},
"failure_reason": {failureReason},
})
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
var response apiChannelStatusResponse
err := json.Unmarshal(body, &response)
if err != nil {
return nil, nil, err
}
if !response.Error.IsNull() {
return nil, nil, errors.Err(response.Error.String)
}
if response.Data != nil {
svs := make(map[string]syncedVideo)
claimNames := make(map[string]bool)
for _, v := range response.Data {
svs[v.VideoID] = v
claimNames[v.ClaimName] = v.Published
}
return svs, claimNames, nil
}
return nil, nil, errors.Err("invalid API response. Status code: %d", res.StatusCode)
}
const (
VideoStatusPublished = "published"
2018-08-02 16:37:22 +02:00
VideoStatusFailed = "failed"
)
func (s *SyncManager) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string, size *int64) error {
endpoint := s.ApiURL + "/yt/video_status"
if len(failureReason) > maxReasonLength {
failureReason = failureReason[:maxReasonLength]
}
vals := url.Values{
"youtube_channel_id": {channelID},
"video_id": {videoID},
"status": {status},
2018-07-24 02:01:35 +02:00
"auth_token": {s.ApiToken},
}
if status == VideoStatusPublished {
if claimID == "" || claimName == "" {
return errors.Err("claimID or claimName missing")
}
vals.Add("published_at", strconv.FormatInt(time.Now().Unix(), 10))
vals.Add("claim_id", claimID)
vals.Add("claim_name", claimName)
2018-08-14 17:09:23 +02:00
if size != nil {
vals.Add("size", strconv.FormatInt(*size, 10))
}
}
if failureReason != "" {
vals.Add("failure_reason", failureReason)
}
res, _ := http.PostForm(endpoint, vals)
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
var response struct {
Success bool `json:"success"`
Error null.String `json:"error"`
Data null.String `json:"data"`
}
err := json.Unmarshal(body, &response)
if err != nil {
return err
}
if !response.Error.IsNull() {
return errors.Err(response.Error.String)
}
if !response.Data.IsNull() && response.Data.String == "ok" {
return nil
}
return errors.Err("invalid API response. Status code: %d", res.StatusCode)
}
func (s *SyncManager) Start() error {
syncCount := 0
for {
err := s.checkUsedSpace()
if err != nil {
return err
}
var syncs []Sync
shouldInterruptLoop := false
isSingleChannelSync := s.YoutubeChannelID != ""
if isSingleChannelSync {
channels, err := s.fetchChannels("")
if err != nil {
return err
}
if len(channels) != 1 {
return errors.Err("Expected 1 channel, %d returned", len(channels))
}
lbryChannelName := channels[0].DesiredChannelName
syncs = make([]Sync, 1)
syncs[0] = Sync{
2018-07-24 02:01:35 +02:00
YoutubeAPIKey: s.YoutubeAPIKey,
YoutubeChannelID: s.YoutubeChannelID,
LbryChannelName: lbryChannelName,
StopOnError: s.StopOnError,
MaxTries: s.MaxTries,
ConcurrentVideos: s.ConcurrentVideos,
TakeOverExistingChannel: s.TakeOverExistingChannel,
Refill: s.Refill,
Manager: s,
LbrycrdString: s.LbrycrdString,
AwsS3ID: s.AwsS3ID,
AwsS3Secret: s.AwsS3Secret,
AwsS3Region: s.AwsS3Region,
AwsS3Bucket: s.AwsS3Bucket,
}
shouldInterruptLoop = true
} else {
var queuesToSync []string
if s.SyncStatus != "" {
queuesToSync = append(queuesToSync, s.SyncStatus)
} else if s.SyncUpdate {
queuesToSync = append(queuesToSync, StatusSyncing, StatusSynced)
} else {
queuesToSync = append(queuesToSync, StatusSyncing, StatusQueued)
}
for _, q := range queuesToSync {
channels, err := s.fetchChannels(q)
if err != nil {
return err
}
for _, c := range channels {
syncs = append(syncs, Sync{
2018-07-24 02:01:35 +02:00
YoutubeAPIKey: s.YoutubeAPIKey,
YoutubeChannelID: c.ChannelId,
LbryChannelName: c.DesiredChannelName,
StopOnError: s.StopOnError,
MaxTries: s.MaxTries,
ConcurrentVideos: s.ConcurrentVideos,
TakeOverExistingChannel: s.TakeOverExistingChannel,
Refill: s.Refill,
Manager: s,
LbrycrdString: s.LbrycrdString,
AwsS3ID: s.AwsS3ID,
AwsS3Secret: s.AwsS3Secret,
AwsS3Region: s.AwsS3Region,
AwsS3Bucket: s.AwsS3Bucket,
})
}
}
}
if len(syncs) == 0 {
log.Infoln("No channels to sync. Pausing 5 minutes!")
time.Sleep(5 * time.Minute)
}
for i, sync := range syncs {
shouldNotCount := false
2018-08-22 16:56:02 +02:00
SendInfoToSlack("Syncing %s (%s) to LBRY! (iteration %d/%d - total processed channels: %d)", sync.LbryChannelName, sync.YoutubeChannelID, i+1, len(syncs), syncCount+1)
err := sync.FullCycle()
if err != nil {
fatalErrors := []string{
"default_wallet already exists",
"WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR",
"NotEnoughFunds",
"no space left on device",
"failure uploading wallet",
}
2018-07-24 02:01:35 +02:00
if util.SubstringInSlice(err.Error(), fatalErrors) {
return errors.Prefix("@Nikooo777 this requires manual intervention! Exiting...", err)
}
shouldNotCount = strings.Contains(err.Error(), "this youtube channel is being managed by another server")
2018-08-22 16:56:02 +02:00
if !shouldNotCount {
SendInfoToSlack("A non fatal error was reported by the sync process. %s\nContinuing...", err.Error())
}
}
2018-08-22 16:56:02 +02:00
SendInfoToSlack("Syncing %s (%s) reached an end. (iteration %d/%d - total processed channels: %d)", sync.LbryChannelName, sync.YoutubeChannelID, i+1, len(syncs), syncCount+1)
if !shouldNotCount {
syncCount++
}
if sync.IsInterrupted() || (s.Limit != 0 && syncCount >= s.Limit) {
shouldInterruptLoop = true
break
}
}
if shouldInterruptLoop || s.SingleRun {
break
}
}
return nil
}
func (s *SyncManager) checkUsedSpace() error {
2018-07-24 02:01:35 +02:00
usedPctile, err := GetUsedSpace(s.BlobsDir)
if err != nil {
return err
}
if usedPctile >= 0.90 && !s.SkipSpaceCheck {
return errors.Err(fmt.Sprintf("more than 90%% of the space has been used. use --skip-space-check to ignore. Used: %.1f%%", usedPctile*100))
}
2018-08-21 20:07:30 +02:00
log.Infof("disk usage: %.1f%%", usedPctile*100)
return nil
}
// GetUsedSpace returns a value between 0 and 1, with 0 being completely empty and 1 being full, for the disk that holds the provided path
func GetUsedSpace(path string) (float32, error) {
var stat syscall.Statfs_t
err := syscall.Statfs(path, &stat)
if err != nil {
return 0, err
}
// Available blocks * size per block = available space in bytes
all := stat.Blocks * uint64(stat.Bsize)
free := stat.Bfree * uint64(stat.Bsize)
used := all - free
return float32(used) / float32(all), nil
}