address review comments

This commit is contained in:
Niko Storni 2018-07-23 20:01:35 -04:00
parent dfe3e8a078
commit 302e080b95
4 changed files with 68 additions and 60 deletions

View file

@ -6,8 +6,6 @@ import (
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"net/url" "net/url"
"os"
"os/user"
"strconv" "strconv"
"time" "time"
@ -35,10 +33,10 @@ type SyncManager struct {
ConcurrentVideos int ConcurrentVideos int
HostName string HostName string
YoutubeChannelID string YoutubeChannelID string
YoutubeAPIKey string
youtubeAPIKey string ApiURL string
apiURL string ApiToken string
apiToken string BlobsDir string
} }
const ( const (
@ -65,9 +63,9 @@ type apiYoutubeChannel struct {
} }
func (s SyncManager) fetchChannels(status string) ([]apiYoutubeChannel, error) { func (s SyncManager) fetchChannels(status string) ([]apiYoutubeChannel, error) {
endpoint := s.apiURL + "/yt/jobs" endpoint := s.ApiURL + "/yt/jobs"
res, _ := http.PostForm(endpoint, url.Values{ res, _ := http.PostForm(endpoint, url.Values{
"auth_token": {s.apiToken}, "auth_token": {s.ApiToken},
"sync_status": {status}, "sync_status": {status},
"min_videos": {strconv.Itoa(1)}, "min_videos": {strconv.Itoa(1)},
"after": {strconv.Itoa(int(s.SyncFrom))}, "after": {strconv.Itoa(int(s.SyncFrom))},
@ -96,12 +94,12 @@ type apiSyncUpdateResponse struct {
} }
func (s SyncManager) setChannelSyncStatus(channelID string, status string) error { func (s SyncManager) setChannelSyncStatus(channelID string, status string) error {
endpoint := s.apiURL + "/yt/sync_update" endpoint := s.ApiURL + "/yt/sync_update"
res, _ := http.PostForm(endpoint, url.Values{ res, _ := http.PostForm(endpoint, url.Values{
"channel_id": {channelID}, "channel_id": {channelID},
"sync_server": {s.HostName}, "sync_server": {s.HostName},
"auth_token": {s.apiToken}, "auth_token": {s.ApiToken},
"sync_status": {status}, "sync_status": {status},
}) })
defer res.Body.Close() defer res.Body.Close()
@ -126,13 +124,13 @@ const (
) )
func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, details string) error { func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, details string) error {
endpoint := s.apiURL + "/yt/track_video" endpoint := s.ApiURL + "/yt/track_video"
vals := url.Values{ vals := url.Values{
"youtube_channel_id": {channelID}, "youtube_channel_id": {channelID},
"youtube_video_id": {videoID}, "youtube_video_id": {videoID},
"status": {status}, "status": {status},
"auth_token": {s.apiToken}, "auth_token": {s.ApiToken},
} }
if status == VideoStatusPublished { if status == VideoStatusPublished {
if claimID == "" || claimName == "" { if claimID == "" || claimName == "" {
@ -163,19 +161,6 @@ func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status st
} }
func (s SyncManager) Start() error { func (s SyncManager) Start() error {
s.apiURL = os.Getenv("LBRY_API")
s.apiToken = os.Getenv("LBRY_API_TOKEN")
s.youtubeAPIKey = os.Getenv("YOUTUBE_API_KEY")
if s.apiURL == "" {
return errors.Err("An API URL was not defined. Please set the environment variable LBRY_API")
}
if s.apiToken == "" {
return errors.Err("An API Token was not defined. Please set the environment variable LBRY_API_TOKEN")
}
if s.youtubeAPIKey == "" {
return errors.Err("A Youtube API key was not defined. Please set the environment variable YOUTUBE_API_KEY")
}
syncCount := 0 syncCount := 0
for { for {
err := s.checkUsedSpace() err := s.checkUsedSpace()
@ -201,7 +186,7 @@ func (s SyncManager) Start() error {
} }
syncs = make([]Sync, 1) syncs = make([]Sync, 1)
syncs[0] = Sync{ syncs[0] = Sync{
YoutubeAPIKey: s.youtubeAPIKey, YoutubeAPIKey: s.YoutubeAPIKey,
YoutubeChannelID: s.YoutubeChannelID, YoutubeChannelID: s.YoutubeChannelID,
LbryChannelName: lbryChannelName, LbryChannelName: lbryChannelName,
StopOnError: s.StopOnError, StopOnError: s.StopOnError,
@ -231,7 +216,7 @@ func (s SyncManager) Start() error {
continue continue
} }
syncs = append(syncs, Sync{ syncs = append(syncs, Sync{
YoutubeAPIKey: s.youtubeAPIKey, YoutubeAPIKey: s.YoutubeAPIKey,
YoutubeChannelID: c.ChannelId, YoutubeChannelID: c.ChannelId,
LbryChannelName: c.DesiredChannelName, LbryChannelName: c.DesiredChannelName,
StopOnError: s.StopOnError, StopOnError: s.StopOnError,
@ -249,7 +234,7 @@ func (s SyncManager) Start() error {
time.Sleep(5 * time.Minute) time.Sleep(5 * time.Minute)
} }
for i, sync := range syncs { for i, sync := range syncs {
util.SendInfoToSlack("Syncing %s to LBRY! (iteration %d/%d - total session iterations: %d)", sync.LbryChannelName, i, len(syncs), syncCount) SendInfoToSlack("Syncing %s to LBRY! (iteration %d/%d - total session iterations: %d)", sync.LbryChannelName, i, len(syncs), syncCount)
err := sync.FullCycle() err := sync.FullCycle()
if err != nil { if err != nil {
fatalErrors := []string{ fatalErrors := []string{
@ -258,12 +243,12 @@ func (s SyncManager) Start() error {
"NotEnoughFunds", "NotEnoughFunds",
"no space left on device", "no space left on device",
} }
if util.ContainedInSlice(err.Error(), fatalErrors) { if util.SubstringInSlice(err.Error(), fatalErrors) {
return errors.Prefix("@Nikooo777 this requires manual intervention! Exiting...", err) return errors.Prefix("@Nikooo777 this requires manual intervention! Exiting...", err)
} }
util.SendInfoToSlack("A non fatal error was reported by the sync process. %s\nContinuing...", err.Error()) SendInfoToSlack("A non fatal error was reported by the sync process. %s\nContinuing...", err.Error())
} }
util.SendInfoToSlack("Syncing %s reached an end. (Iteration %d/%d - total session iterations: %d))", sync.LbryChannelName, i, len(syncs), syncCount) SendInfoToSlack("Syncing %s reached an end. (Iteration %d/%d - total session iterations: %d))", sync.LbryChannelName, i, len(syncs), syncCount)
syncCount++ syncCount++
if sync.IsInterrupted() || (s.Limit != 0 && syncCount >= s.Limit) { if sync.IsInterrupted() || (s.Limit != 0 && syncCount >= s.Limit) {
shouldInterruptLoop = true shouldInterruptLoop = true
@ -283,18 +268,14 @@ func (s SyncManager) isWorthProcessing(channel apiYoutubeChannel) bool {
} }
func (s SyncManager) checkUsedSpace() error { func (s SyncManager) checkUsedSpace() error {
usr, err := user.Current() usedPctile, err := GetUsedSpace(s.BlobsDir)
if err != nil {
return err
}
usedPctile, err := GetUsedSpace(usr.HomeDir + "/.lbrynet/blobfiles/")
if err != nil { if err != nil {
return err return err
} }
if usedPctile >= 0.90 && !s.SkipSpaceCheck { if usedPctile >= 0.90 && !s.SkipSpaceCheck {
return errors.Err(fmt.Sprintf("more than 90%% of the space has been used. use --skip-space-check to ignore. Used: %.1f%%", usedPctile*100)) return errors.Err(fmt.Sprintf("more than 90%% of the space has been used. use --skip-space-check to ignore. Used: %.1f%%", usedPctile*100))
} }
util.SendInfoToSlack("disk usage: %.1f%%", usedPctile*100) SendInfoToSlack("disk usage: %.1f%%", usedPctile*100)
return nil return nil
} }

View file

@ -8,7 +8,6 @@ import (
"github.com/lbryio/lbry.go/jsonrpc" "github.com/lbryio/lbry.go/jsonrpc"
"github.com/lbryio/lbry.go/lbrycrd" "github.com/lbryio/lbry.go/lbrycrd"
"github.com/lbryio/lbry.go/util"
"github.com/shopspring/decimal" "github.com/shopspring/decimal"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -51,9 +50,12 @@ func (s *Sync) walletSetup() error {
} }
log.Debugf("We already published %d videos", numPublished) log.Debugf("We already published %d videos", numPublished)
if float64(numOnSource)-float64(numPublished) > maximumVideosToPublish {
numOnSource = maximumVideosToPublish
}
minBalance := (float64(numOnSource)-float64(numPublished))*(publishAmount+0.1) + channelClaimAmount minBalance := (float64(numOnSource)-float64(numPublished))*(publishAmount+0.1) + channelClaimAmount
if numPublished > numOnSource { if numPublished > numOnSource {
util.SendErrorToSlack("something is going on as we published more videos than those available on source: %d/%d", numPublished, numOnSource) SendErrorToSlack("something is going on as we published more videos than those available on source: %d/%d", numPublished, numOnSource)
minBalance = 1 //since we ended up in this function it means some juice is still needed minBalance = 1 //since we ended up in this function it means some juice is still needed
} }
amountToAdd, _ := decimal.NewFromFloat(minBalance).Sub(balance).Float64() amountToAdd, _ := decimal.NewFromFloat(minBalance).Sub(balance).Float64()
@ -169,7 +171,7 @@ func (s *Sync) waitUntilUTXOsConfirmed() error {
if time.Now().After(origin.Add(15 * time.Minute)) { if time.Now().After(origin.Add(15 * time.Minute)) {
//lbryum is messing with us or something. restart the daemon //lbryum is messing with us or something. restart the daemon
//this could also be a very long block //this could also be a very long block
util.SendErrorToSlack("We've been waiting UTXOs confirmation for %s... and this isn't normal", time.Now().Sub(origin).String()) SendErrorToSlack("We've been waiting UTXOs confirmation for %s... and this isn't normal", time.Now().Sub(origin).String())
} }
wait := 30 * time.Second wait := 30 * time.Second
log.Println("Waiting " + wait.String() + "...") log.Println("Waiting " + wait.String() + "...")

View file

@ -82,7 +82,7 @@ func (v YoutubeVideo) getFilename() string {
if len(name) < 1 { if len(name) < 1 {
name = v.id name = v.id
} }
return v.dir + "/" + v.id + "/" + name + ".mp4" return v.videoDir() + "/" + name + ".mp4"
} }
func (v YoutubeVideo) getAbbrevDescription() string { func (v YoutubeVideo) getAbbrevDescription() string {
@ -97,7 +97,7 @@ func (v YoutubeVideo) getAbbrevDescription() string {
func (v YoutubeVideo) download() error { func (v YoutubeVideo) download() error {
videoPath := v.getFilename() videoPath := v.getFilename()
err := os.Mkdir(v.dir+"/"+v.id, 0750) err := os.Mkdir(v.videoDir(), 0750)
if err != nil && !strings.Contains(err.Error(), "file exists") { if err != nil && !strings.Contains(err.Error(), "file exists") {
return errors.Wrap(err, 0) return errors.Wrap(err, 0)
} }
@ -127,6 +127,10 @@ func (v YoutubeVideo) download() error {
return videoInfo.Download(videoInfo.Formats.Best(ytdl.FormatAudioEncodingKey)[0], downloadedFile) return videoInfo.Download(videoInfo.Formats.Best(ytdl.FormatAudioEncodingKey)[0], downloadedFile)
} }
func (v YoutubeVideo) videoDir() string {
return v.dir + "/" + v.id
}
func (v YoutubeVideo) delete() error { func (v YoutubeVideo) delete() error {
videoPath := v.getFilename() videoPath := v.getFilename()
err := os.Remove(videoPath) err := os.Remove(videoPath)

View file

@ -30,8 +30,9 @@ import (
) )
const ( const (
channelClaimAmount = 0.01 channelClaimAmount = 0.01
publishAmount = 0.01 publishAmount = 0.01
maximumVideosToPublish = 1000
) )
type video interface { type video interface {
@ -73,6 +74,26 @@ type Sync struct {
queue chan video queue chan video
} }
// SendErrorToSlack Sends an error message to the default channel and to the process log.
func SendErrorToSlack(format string, a ...interface{}) error {
message := format
if len(a) > 0 {
message = fmt.Sprintf(format, a...)
}
log.Errorln(message)
return util.SendToSlack(":sos: " + message)
}
// SendInfoToSlack Sends an info message to the default channel and to the process log.
func SendInfoToSlack(format string, a ...interface{}) error {
message := format
if len(a) > 0 {
message = fmt.Sprintf(format, a...)
}
log.Infoln(message)
return util.SendToSlack(":information_source: " + message)
}
// IsInterrupted can be queried to discover if the sync process was interrupted manually // IsInterrupted can be queried to discover if the sync process was interrupted manually
func (s *Sync) IsInterrupted() bool { func (s *Sync) IsInterrupted() bool {
select { select {
@ -100,7 +121,7 @@ func (s *Sync) FullCycle() (e error) {
noFailConditions := []string{ noFailConditions := []string{
"this youtube channel is being managed by another server", "this youtube channel is being managed by another server",
} }
if util.ContainedInSlice(e.Error(), noFailConditions) { if util.SubstringInSlice(e.Error(), noFailConditions) {
return return
} }
err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusFailed) err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusFailed)
@ -109,7 +130,7 @@ func (s *Sync) FullCycle() (e error) {
err = errors.Prefix(msg, err) err = errors.Prefix(msg, err)
e = errors.Prefix(err.Error(), e) e = errors.Prefix(err.Error(), e)
} }
} else { } else if !s.IsInterrupted() {
err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSynced) err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSynced)
if err != nil { if err != nil {
e = err e = err
@ -210,8 +231,8 @@ WaitForDaemonStart:
return nil return nil
} }
func logShutdownError(shutdownErr error) { func logShutdownError(shutdownErr error) {
util.SendErrorToSlack("error shutting down daemon: %v", shutdownErr) SendErrorToSlack("error shutting down daemon: %v", shutdownErr)
util.SendErrorToSlack("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR") SendErrorToSlack("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR")
} }
func (s *Sync) doSync() error { func (s *Sync) doSync() error {
@ -281,7 +302,7 @@ func (s *Sync) startWorker(workerNum int) {
"NotEnoughFunds", "NotEnoughFunds",
"Cannot publish using channel", "Cannot publish using channel",
} }
if util.ContainedInSlice(err.Error(), fatalErrors) || s.StopOnError { if util.SubstringInSlice(err.Error(), fatalErrors) || s.StopOnError {
s.grp.Stop() s.grp.Stop()
} else if s.MaxTries > 1 { } else if s.MaxTries > 1 {
errorsNoRetry := []string{ errorsNoRetry := []string{
@ -296,7 +317,7 @@ func (s *Sync) startWorker(workerNum int) {
"Client.Timeout exceeded while awaiting headers)", "Client.Timeout exceeded while awaiting headers)",
"video is bigger than 2GB, skipping for now", "video is bigger than 2GB, skipping for now",
} }
if util.ContainedInSlice(err.Error(), errorsNoRetry) { if util.SubstringInSlice(err.Error(), errorsNoRetry) {
log.Println("This error should not be retried at all") log.Println("This error should not be retried at all")
} else if tryCount < s.MaxTries { } else if tryCount < s.MaxTries {
if strings.Contains(err.Error(), "txn-mempool-conflict") || if strings.Contains(err.Error(), "txn-mempool-conflict") ||
@ -307,19 +328,19 @@ func (s *Sync) startWorker(workerNum int) {
err = s.walletSetup() err = s.walletSetup()
if err != nil { if err != nil {
s.grp.Stop() s.grp.Stop()
util.SendErrorToSlack("Failed to setup the wallet for a refill: %v", err) SendErrorToSlack("Failed to setup the wallet for a refill: %v", err)
break break
} }
} }
log.Println("Retrying") log.Println("Retrying")
continue continue
} }
util.SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg) SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg)
} }
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoSStatusFailed, "", "", err.Error()) /*err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoSStatusFailed, "", "", err.Error())
if err != nil { if err != nil {
util.SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
} }*/
} }
break break
} }
@ -483,18 +504,18 @@ func (s *Sync) processVideo(v video) (err error) {
return nil return nil
} }
if v.PlaylistPosition() > 1000 { if v.PlaylistPosition() > maximumVideosToPublish {
log.Println(v.ID() + " is old: skipping") log.Println(v.ID() + " is old: skipping")
return nil return nil
} }
summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.LbryChannelName) _, err = v.Sync(s.daemon, s.claimAddress, publishAmount, s.LbryChannelName)
if err != nil { if err != nil {
return err return err
} }
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "") /*err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "")
if err != nil { if err != nil {
util.SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
} }*/
err = s.db.SetPublished(v.ID()) err = s.db.SetPublished(v.ID())
if err != nil { if err != nil {
return err return err