remove common.go

merge sync and selfsync
export sync manager out of cmd package
clean up ytsync.go
address #19 and #20
This commit is contained in:
Niko Storni 2018-07-17 12:54:22 -04:00
parent b6eb3db397
commit 453ba5450b
2 changed files with 285 additions and 28 deletions

255
manager.go Normal file
View file

@ -0,0 +1,255 @@
package ytsync
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"os"
"os/user"
"strconv"
"time"
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/null"
"github.com/lbryio/lbry.go/util"
log "github.com/sirupsen/logrus"
)
type SyncManager struct {
StopOnError bool
MaxTries int
TakeOverExistingChannel bool
Refill int
Limit int
SkipSpaceCheck bool
SyncUpdate bool
SyncStatus string
SyncFrom int64
SyncUntil int64
ConcurrentJobs int
ConcurrentVideos int
HostName string
YoutubeChannelID string
youtubeAPIKey string
apiURL string
apiToken string
}
const (
StatusPending = "pending" // waiting for permission to sync
StatusQueued = "queued" // in sync queue. will be synced soon
StatusSyncing = "syncing" // syncing now
StatusSynced = "synced" // done
StatusFailed = "failed"
)
var SyncStatuses = []string{StatusPending, StatusQueued, StatusSyncing, StatusSynced, StatusFailed}
type apiJobsResponse struct {
Success bool `json:"success"`
Error null.String `json:"error"`
Data []apiYoutubeChannel `json:"data"`
}
type apiYoutubeChannel struct {
ChannelId string `json:"channel_id"`
TotalVideos uint `json:"total_videos"`
DesiredChannelName string `json:"desired_channel_name"`
SyncServer null.String `json:"sync_server"`
}
func (s SyncManager) fetchChannels(status string) ([]apiYoutubeChannel, error) {
endpoint := s.apiURL + "/yt/jobs"
res, _ := http.PostForm(endpoint, url.Values{
"auth_token": {s.apiToken},
"sync_status": {status},
"min_videos": {strconv.Itoa(1)},
"after": {strconv.Itoa(int(s.SyncFrom))},
"before": {strconv.Itoa(int(s.SyncUntil))},
//"sync_server": {s.HostName},
"channel_id": {s.YoutubeChannelID},
})
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
var response apiJobsResponse
err := json.Unmarshal(body, &response)
if err != nil {
return nil, err
}
if response.Data == nil {
return nil, errors.Err(response.Error)
}
log.Printf("Fetched channels: %d", len(response.Data))
return response.Data, nil
}
type apiSyncUpdateResponse struct {
Success bool `json:"success"`
Error null.String `json:"error"`
Data null.String `json:"data"`
}
func (s SyncManager) setChannelSyncStatus(channelID string, status string) error {
endpoint := s.apiURL + "/yt/sync_update"
res, _ := http.PostForm(endpoint, url.Values{
"channel_id": {channelID},
"sync_server": {s.HostName},
"auth_token": {s.apiToken},
"sync_status": {status},
})
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
var response apiSyncUpdateResponse
err := json.Unmarshal(body, &response)
if err != nil {
return err
}
if !response.Error.IsNull() {
return errors.Err(response.Error.String)
}
if !response.Data.IsNull() && response.Data.String == "ok" {
return nil
}
return errors.Err("invalid API response")
}
func (s SyncManager) Start() error {
s.apiURL = os.Getenv("LBRY_API")
s.apiToken = os.Getenv("LBRY_API_TOKEN")
s.youtubeAPIKey = os.Getenv("YOUTUBE_API_KEY")
if s.apiURL == "" {
return errors.Err("An API URL was not defined. Please set the environment variable LBRY_API")
}
if s.apiToken == "" {
return errors.Err("An API Token was not defined. Please set the environment variable LBRY_API_TOKEN")
}
if s.youtubeAPIKey == "" {
return errors.Err("A Youtube API key was not defined. Please set the environment variable YOUTUBE_API_KEY")
}
syncCount := 0
for {
err := s.checkUsedSpace()
if err != nil {
return err
}
var syncs []Sync
shouldInterruptLoop := false
isSingleChannelSync := s.YoutubeChannelID != ""
if isSingleChannelSync {
channels, err := s.fetchChannels("")
if err != nil {
return err
}
if len(channels) != 1 {
return errors.Err("Expected 1 channel, %d returned", len(channels))
}
lbryChannelName := channels[0].DesiredChannelName
if !s.isWorthProcessing(channels[0]) {
break
}
syncs = make([]Sync, 1)
syncs[0] = Sync{
YoutubeAPIKey: s.youtubeAPIKey,
YoutubeChannelID: s.YoutubeChannelID,
LbryChannelName: lbryChannelName,
StopOnError: s.StopOnError,
MaxTries: s.MaxTries,
ConcurrentVideos: s.ConcurrentVideos,
TakeOverExistingChannel: s.TakeOverExistingChannel,
Refill: s.Refill,
Manager: &s,
}
shouldInterruptLoop = true
} else {
var queuesToSync []string
if s.SyncStatus != "" {
queuesToSync = append(queuesToSync, s.SyncStatus)
} else if s.SyncUpdate {
queuesToSync = append(queuesToSync, StatusSyncing, StatusSynced)
} else {
queuesToSync = append(queuesToSync, StatusSyncing, StatusQueued)
}
for _, q := range queuesToSync {
channels, err := s.fetchChannels(q)
if err != nil {
return err
}
for _, c := range channels {
if !s.isWorthProcessing(c) {
continue
}
syncs = append(syncs, Sync{
YoutubeAPIKey: s.youtubeAPIKey,
YoutubeChannelID: c.ChannelId,
LbryChannelName: c.DesiredChannelName,
StopOnError: s.StopOnError,
MaxTries: s.MaxTries,
ConcurrentVideos: s.ConcurrentVideos,
TakeOverExistingChannel: s.TakeOverExistingChannel,
Refill: s.Refill,
Manager: &s,
})
}
}
}
if len(syncs) == 0 {
log.Infoln("No channels to sync. Pausing 5 minutes!")
time.Sleep(5 * time.Minute)
}
for i, sync := range syncs {
util.SendInfoToSlack("Syncing %s to LBRY! (iteration %d/%d - total session iterations: %d)", sync.LbryChannelName, i, len(syncs), syncCount)
err := sync.FullCycle()
if err != nil {
fatalErrors := []string{
"default_wallet already exists",
"WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR",
"NotEnoughFunds",
"no space left on device",
}
if util.InSliceContains(err.Error(), fatalErrors) {
return errors.Prefix("@Nikooo777 this requires manual intervention! Exiting...", err)
}
util.SendInfoToSlack("A non fatal error was reported by the sync process. %s\nContinuing...", err.Error())
}
util.SendInfoToSlack("Syncing %s reached an end. (Iteration %d/%d - total session iterations: %d))", sync.LbryChannelName, i, len(syncs), syncCount)
syncCount++
if sync.IsInterrupted() || (s.Limit != 0 && syncCount >= s.Limit) {
shouldInterruptLoop = true
break
}
}
if shouldInterruptLoop {
break
}
}
return nil
}
func (s SyncManager) isWorthProcessing(channel apiYoutubeChannel) bool {
return channel.TotalVideos > 0 && (channel.SyncServer.IsNull() || channel.SyncServer.String == s.HostName)
}
func (s SyncManager) checkUsedSpace() error {
usr, err := user.Current()
if err != nil {
return err
}
usedPctile, err := util.GetUsedSpace(usr.HomeDir + "/.lbrynet/blobfiles/")
if err != nil {
return err
}
if usedPctile >= 0.90 && !s.SkipSpaceCheck {
return errors.Err(fmt.Sprintf("more than 90%% of the space has been used. use --skip-space-check to ignore. Used: %.1f%%", usedPctile*100))
}
util.SendInfoToSlack("disk usage: %.1f%%", usedPctile*100)
return nil
}

View file

@ -59,6 +59,7 @@ type Sync struct {
ConcurrentVideos int
TakeOverExistingChannel bool
Refill int
Manager *SyncManager
daemon *jsonrpc.Client
claimAddress string
@ -82,19 +83,40 @@ func (s *Sync) IsInterrupted() bool {
}
}
func (s *Sync) FullCycle() error {
var err error
func (s *Sync) FullCycle() (e error) {
if os.Getenv("HOME") == "" {
return errors.Err("no $HOME env var found")
}
if s.YoutubeChannelID == "" {
channelID, err := getChannelIDFromFile(s.LbryChannelName)
if err != nil {
return err
}
s.YoutubeChannelID = channelID
return errors.Err("channel ID not provided")
}
err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSyncing)
if err != nil {
return err
}
defer func() {
if e != nil {
//conditions for which a channel shouldn't be marked as failed
noFailConditions := []string{
"this youtube channel is being managed by another server",
}
if util.InSliceContains(e.Error(), noFailConditions) {
return
}
err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusFailed)
if err != nil {
msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName)
err = errors.Prefix(msg, err)
e = errors.Prefix(err.Error(), e)
}
} else {
err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSynced)
if err != nil {
e = err
}
}
}()
defaultWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/default_wallet"
if os.Getenv("REGTEST") == "true" {
defaultWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/default_wallet"
@ -493,26 +515,6 @@ func stopDaemonViaSystemd() error {
return nil
}
func getChannelIDFromFile(channelName string) (string, error) {
channelsJSON, err := ioutil.ReadFile("./channels")
if err != nil {
return "", errors.Wrap(err, 0)
}
var channels map[string]string
err = json.Unmarshal(channelsJSON, &channels)
if err != nil {
return "", errors.Wrap(err, 0)
}
channelID, ok := channels[channelName]
if !ok {
return "", errors.Err("channel not in list")
}
return channelID, nil
}
// waitForDaemonProcess observes the running processes and returns when the process is no longer running or when the timeout is up
func waitForDaemonProcess(timeout time.Duration) error {
processes, err := ps.Processes()