Better tracking (Size and failure reason) #35

Merged
nikooo777 merged 8 commits from better-tracking into master 2018-09-26 22:26:49 +02:00
4 changed files with 57 additions and 53 deletions
Showing only changes of commit 697a6ad6bb - Show all commits

View file

@ -70,7 +70,7 @@ type apiYoutubeChannel struct {
SyncServer null.String `json:"sync_server"` SyncServer null.String `json:"sync_server"`
} }
func (s SyncManager) fetchChannels(status string) ([]apiYoutubeChannel, error) { func (s *SyncManager) fetchChannels(status string) ([]apiYoutubeChannel, error) {
endpoint := s.ApiURL + "/yt/jobs" endpoint := s.ApiURL + "/yt/jobs"
res, _ := http.PostForm(endpoint, url.Values{ res, _ := http.PostForm(endpoint, url.Values{
"auth_token": {s.ApiToken}, "auth_token": {s.ApiToken},
@ -107,14 +107,17 @@ type syncedVideo struct {
FailureReason string `json:"failure_reason"` FailureReason string `json:"failure_reason"`
} }
func (s SyncManager) setChannelStatus(channelID string, status string) (map[string]syncedVideo, error) { func (s *SyncManager) setChannelStatus(channelID string, status string, failureReason string) (map[string]syncedVideo, error) {
endpoint := s.ApiURL + "/yt/channel_status" endpoint := s.ApiURL + "/yt/channel_status"
if len(failureReason) > maxReasonLength {
failureReason = failureReason[:maxReasonLength]
}
res, _ := http.PostForm(endpoint, url.Values{ res, _ := http.PostForm(endpoint, url.Values{
"channel_id": {channelID}, "channel_id": {channelID},
"sync_server": {s.HostName}, "sync_server": {s.HostName},
"auth_token": {s.ApiToken}, "auth_token": {s.ApiToken},
"sync_status": {status}, "sync_status": {status},
"failure_reason": {failureReason},
}) })
defer res.Body.Close() defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body) body, _ := ioutil.ReadAll(res.Body)
@ -141,9 +144,11 @@ const (
VideoStatusFailed = "failed" VideoStatusFailed = "failed"
) )
func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string, size *int64) error { func (s *SyncManager) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string, size *int64) error {
endpoint := s.ApiURL + "/yt/video_status" endpoint := s.ApiURL + "/yt/video_status"
if len(failureReason) > maxReasonLength {
failureReason = failureReason[:maxReasonLength]
}
vals := url.Values{ vals := url.Values{
"youtube_channel_id": {channelID}, "youtube_channel_id": {channelID},
"video_id": {videoID}, "video_id": {videoID},
@ -162,10 +167,6 @@ func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status st
} }
} }
if failureReason != "" { if failureReason != "" {
maxReasonLength := 500
if len(failureReason) > maxReasonLength {
failureReason = failureReason[:500]
}
vals.Add("failure_reason", failureReason) vals.Add("failure_reason", failureReason)
} }
res, _ := http.PostForm(endpoint, vals) res, _ := http.PostForm(endpoint, vals)
@ -189,7 +190,7 @@ func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status st
return errors.Err("invalid API response. Status code: %d", res.StatusCode) return errors.Err("invalid API response. Status code: %d", res.StatusCode)
} }
func (s SyncManager) Start() error { func (s *SyncManager) Start() error {
syncCount := 0 syncCount := 0
for { for {
err := s.checkUsedSpace() err := s.checkUsedSpace()
@ -223,7 +224,7 @@ func (s SyncManager) Start() error {
ConcurrentVideos: s.ConcurrentVideos, ConcurrentVideos: s.ConcurrentVideos,
TakeOverExistingChannel: s.TakeOverExistingChannel, TakeOverExistingChannel: s.TakeOverExistingChannel,
Refill: s.Refill, Refill: s.Refill,
Manager: &s, Manager: s,
LbrycrdString: s.LbrycrdString, LbrycrdString: s.LbrycrdString,
AwsS3ID: s.AwsS3ID, AwsS3ID: s.AwsS3ID,
AwsS3Secret: s.AwsS3Secret, AwsS3Secret: s.AwsS3Secret,
@ -258,7 +259,7 @@ func (s SyncManager) Start() error {
ConcurrentVideos: s.ConcurrentVideos, ConcurrentVideos: s.ConcurrentVideos,
TakeOverExistingChannel: s.TakeOverExistingChannel, TakeOverExistingChannel: s.TakeOverExistingChannel,
Refill: s.Refill, Refill: s.Refill,
Manager: &s, Manager: s,
LbrycrdString: s.LbrycrdString, LbrycrdString: s.LbrycrdString,
AwsS3ID: s.AwsS3ID, AwsS3ID: s.AwsS3ID,
AwsS3Secret: s.AwsS3Secret, AwsS3Secret: s.AwsS3Secret,
@ -308,11 +309,11 @@ func (s SyncManager) Start() error {
return nil return nil
} }
func (s SyncManager) isWorthProcessing(channel apiYoutubeChannel) bool { func (s *SyncManager) isWorthProcessing(channel apiYoutubeChannel) bool {
return channel.TotalVideos > 0 && (channel.SyncServer.IsNull() || channel.SyncServer.String == s.HostName) return channel.TotalVideos > 0 && (channel.SyncServer.IsNull() || channel.SyncServer.String == s.HostName)
} }
func (s SyncManager) checkUsedSpace() error { func (s *SyncManager) checkUsedSpace() error {
usedPctile, err := GetUsedSpace(s.BlobsDir) usedPctile, err := GetUsedSpace(s.BlobsDir)
if err != nil { if err != nil {
return err return err

View file

@ -28,9 +28,9 @@ type ucbVideo struct {
dir string dir string
} }
func NewUCBVideo(id, title, channel, description, publishedAt, dir string) ucbVideo { func NewUCBVideo(id, title, channel, description, publishedAt, dir string) *ucbVideo {
p, _ := time.Parse(time.RFC3339Nano, publishedAt) // ignore parse errors p, _ := time.Parse(time.RFC3339Nano, publishedAt) // ignore parse errors
return ucbVideo{ return &ucbVideo{
id: id, id: id,
title: title, title: title,
description: description, description: description,
@ -40,19 +40,19 @@ func NewUCBVideo(id, title, channel, description, publishedAt, dir string) ucbVi
} }
} }
func (v ucbVideo) ID() string { func (v *ucbVideo) ID() string {
return v.id return v.id
} }
lyoshenka commented 2018-08-27 19:59:17 +02:00 (Migrated from github.com)
Review

why are these fields here if they never get used?

why are these fields here if they never get used?
lyoshenka commented 2018-08-27 20:00:08 +02:00 (Migrated from github.com)
Review

and having syncedVideosMux on each video is weird. whats going on?

and having syncedVideosMux on each video is weird. whats going on?
nikooo777 commented 2018-08-27 20:12:54 +02:00 (Migrated from github.com)
Review

there is only one mutex for the whole map, the pattern is the same you used locally in the function where a name is generated for the claim.
I however agree with your comment about passing down the mutex, I'm going to correct that.

there is only one mutex for the whole map, the pattern is the same you used locally in the function where a name is generated for the claim. I however agree with your comment about passing down the mutex, I'm going to correct that.
func (v ucbVideo) PlaylistPosition() int { func (v *ucbVideo) PlaylistPosition() int {
return 0 return 0
} }
func (v ucbVideo) IDAndNum() string { func (v *ucbVideo) IDAndNum() string {
return v.ID() + " (?)" return v.ID() + " (?)"
} }
func (v ucbVideo) PublishedAt() time.Time { func (v *ucbVideo) PublishedAt() time.Time {
return v.publishedAt return v.publishedAt
//r := regexp.MustCompile(`(\d\d\d\d)-(\d\d)-(\d\d)`) //r := regexp.MustCompile(`(\d\d\d\d)-(\d\d)-(\d\d)`)
//matches := r.FindStringSubmatch(v.title) //matches := r.FindStringSubmatch(v.title)
@ -65,11 +65,11 @@ func (v ucbVideo) PublishedAt() time.Time {
//return time.Now() //return time.Now()
} }
func (v ucbVideo) getFilename() string { func (v *ucbVideo) getFilename() string {
return v.dir + "/" + v.id + ".mp4" return v.dir + "/" + v.id + ".mp4"
} }
func (v ucbVideo) getClaimName(attempt int) string { func (v *ucbVideo) getClaimName(attempt int) string {
reg := regexp.MustCompile(`[^a-zA-Z0-9]+`) reg := regexp.MustCompile(`[^a-zA-Z0-9]+`)
suffix := "" suffix := ""
if attempt > 1 { if attempt > 1 {
@ -98,7 +98,7 @@ func (v ucbVideo) getClaimName(attempt int) string {
return name + suffix return name + suffix
} }
func (v ucbVideo) getAbbrevDescription() string { func (v *ucbVideo) getAbbrevDescription() string {
maxLines := 10 maxLines := 10
description := strings.TrimSpace(v.description) description := strings.TrimSpace(v.description)
if strings.Count(description, "\n") < maxLines { if strings.Count(description, "\n") < maxLines {
@ -107,7 +107,7 @@ func (v ucbVideo) getAbbrevDescription() string {
return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..." return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..."
} }
func (v ucbVideo) download() error { func (v *ucbVideo) download() error {
videoPath := v.getFilename() videoPath := v.getFilename()
_, err := os.Stat(videoPath) _, err := os.Stat(videoPath)
@ -146,7 +146,7 @@ func (v ucbVideo) download() error {
return nil return nil
} }
func (v ucbVideo) saveThumbnail() error { func (v *ucbVideo) saveThumbnail() error {
resp, err := http.Get("https://s3.us-east-2.amazonaws.com/lbry-niko2/thumbnails/" + v.id) resp, err := http.Get("https://s3.us-east-2.amazonaws.com/lbry-niko2/thumbnails/" + v.id)
if err != nil { if err != nil {
return err return err
@ -170,7 +170,7 @@ func (v ucbVideo) saveThumbnail() error {
return err return err
} }
func (v ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) { func (v *ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) {
options := jsonrpc.PublishOptions{ options := jsonrpc.PublishOptions{
Title: &v.title, Title: &v.title,
Author: strPtr("UC Berkeley"), Author: strPtr("UC Berkeley"),
@ -186,11 +186,11 @@ func (v ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount fl
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options) return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options)
} }
func (v ucbVideo) Size() *int64 { func (v *ucbVideo) Size() *int64 {
return nil return nil
} }
func (v ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) { func (v *ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) {
//download and thumbnail can be done in parallel //download and thumbnail can be done in parallel
err := v.download() err := v.download()
if err != nil { if err != nil {

View file

@ -30,9 +30,9 @@ type YoutubeVideo struct {
dir string dir string
} }
func NewYoutubeVideo(directory string, snippet *youtube.PlaylistItemSnippet) YoutubeVideo { func NewYoutubeVideo(directory string, snippet *youtube.PlaylistItemSnippet) *YoutubeVideo {
publishedAt, _ := time.Parse(time.RFC3339Nano, snippet.PublishedAt) // ignore parse errors publishedAt, _ := time.Parse(time.RFC3339Nano, snippet.PublishedAt) // ignore parse errors
return YoutubeVideo{ return &YoutubeVideo{
id: snippet.ResourceId.VideoId, id: snippet.ResourceId.VideoId,
title: snippet.Title, title: snippet.Title,
description: snippet.Description, description: snippet.Description,
@ -43,23 +43,23 @@ func NewYoutubeVideo(directory string, snippet *youtube.PlaylistItemSnippet) You
} }
} }
func (v YoutubeVideo) ID() string { func (v *YoutubeVideo) ID() string {
return v.id return v.id
} }
func (v YoutubeVideo) PlaylistPosition() int { func (v *YoutubeVideo) PlaylistPosition() int {
return int(v.playlistPosition) return int(v.playlistPosition)
} }
func (v YoutubeVideo) IDAndNum() string { func (v *YoutubeVideo) IDAndNum() string {
return v.ID() + " (" + strconv.Itoa(int(v.playlistPosition)) + " in channel)" return v.ID() + " (" + strconv.Itoa(int(v.playlistPosition)) + " in channel)"
} }
func (v YoutubeVideo) PublishedAt() time.Time { func (v *YoutubeVideo) PublishedAt() time.Time {
return v.publishedAt return v.publishedAt
} }
func (v YoutubeVideo) getFilename() string { func (v *YoutubeVideo) getFilename() string {
maxLen := 30 maxLen := 30
reg := regexp.MustCompile(`[^a-zA-Z0-9]+`) reg := regexp.MustCompile(`[^a-zA-Z0-9]+`)
@ -86,7 +86,7 @@ func (v YoutubeVideo) getFilename() string {
return v.videoDir() + "/" + name + ".mp4" return v.videoDir() + "/" + name + ".mp4"
} }
func (v YoutubeVideo) getAbbrevDescription() string { func (v *YoutubeVideo) getAbbrevDescription() string {
maxLines := 10 maxLines := 10
description := strings.TrimSpace(v.description) description := strings.TrimSpace(v.description)
if strings.Count(description, "\n") < maxLines { if strings.Count(description, "\n") < maxLines {
@ -95,7 +95,7 @@ func (v YoutubeVideo) getAbbrevDescription() string {
return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..." return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..."
} }
func (v YoutubeVideo) download() error { func (v *YoutubeVideo) download() error {
videoPath := v.getFilename() videoPath := v.getFilename()
err := os.Mkdir(v.videoDir(), 0750) err := os.Mkdir(v.videoDir(), 0750)
@ -128,11 +128,11 @@ func (v YoutubeVideo) download() error {
return videoInfo.Download(videoInfo.Formats.Best(ytdl.FormatAudioEncodingKey)[0], downloadedFile) return videoInfo.Download(videoInfo.Formats.Best(ytdl.FormatAudioEncodingKey)[0], downloadedFile)
} }
func (v YoutubeVideo) videoDir() string { func (v *YoutubeVideo) videoDir() string {
return v.dir + "/" + v.id return v.dir + "/" + v.id
} }
func (v YoutubeVideo) delete() error { func (v *YoutubeVideo) delete() error {
videoPath := v.getFilename() videoPath := v.getFilename()
err := os.Remove(videoPath) err := os.Remove(videoPath)
if err != nil { if err != nil {
@ -143,7 +143,7 @@ func (v YoutubeVideo) delete() error {
return nil return nil
} }
func (v YoutubeVideo) triggerThumbnailSave() error { func (v *YoutubeVideo) triggerThumbnailSave() error {
client := &http.Client{Timeout: 30 * time.Second} client := &http.Client{Timeout: 30 * time.Second}
params, err := json.Marshal(map[string]string{"videoid": v.id}) params, err := json.Marshal(map[string]string{"videoid": v.id})
@ -186,7 +186,7 @@ func (v YoutubeVideo) triggerThumbnailSave() error {
func strPtr(s string) *string { return &s } func strPtr(s string) *string { return &s }
func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) { func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) {
if channelID == "" { if channelID == "" {
return nil, errors.Err("a claim_id for the channel wasn't provided") //TODO: this is probably not needed? return nil, errors.Err("a claim_id for the channel wasn't provided") //TODO: this is probably not needed?
} }
@ -204,11 +204,11 @@ func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amoun
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options) return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options)
} }
func (v YoutubeVideo) Size() *int64 { func (v *YoutubeVideo) Size() *int64 {
return v.size return v.size
} }
func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) { func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) {
//download and thumbnail can be done in parallel //download and thumbnail can be done in parallel
err := v.download() err := v.download()
if err != nil { if err != nil {
@ -220,9 +220,10 @@ func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount f
if err != nil { if err != nil {
return nil, err return nil, err
} }
*v.size = fi.Size() videoSize := fi.Size()
v.size = &videoSize
if fi.Size() > int64(maxVideoSize)*1024*1024 { if videoSize > int64(maxVideoSize)*1024*1024 {
//delete the video and ignore the error //delete the video and ignore the error
_ = v.delete() _ = v.delete()
return nil, errors.Err("the video is too big to sync, skipping for now") return nil, errors.Err("the video is too big to sync, skipping for now")

View file

@ -38,6 +38,7 @@ import (
const ( const (
channelClaimAmount = 0.01 channelClaimAmount = 0.01
publishAmount = 0.01 publishAmount = 0.01
maxReasonLength = 500
) )
type video interface { type video interface {
@ -237,7 +238,7 @@ func (s *Sync) FullCycle() (e error) {
log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)") log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)")
s.grp.Stop() s.grp.Stop()
}() }()
syncedVideos, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing) syncedVideos, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing, "")
if err != nil { if err != nil {
return err return err
} }
@ -299,14 +300,15 @@ func (s *Sync) updateChannelStatus(e *error) {
if util.SubstringInSlice((*e).Error(), noFailConditions) { if util.SubstringInSlice((*e).Error(), noFailConditions) {
return return
} }
_, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed) failureReason := (*e).Error()
_, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed, failureReason)
if err != nil { if err != nil {
msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName) msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName)
err = errors.Prefix(msg, err) err = errors.Prefix(msg, err)
*e = errors.Prefix(err.Error(), *e) *e = errors.Prefix(err.Error(), *e)
} }
} else if !s.IsInterrupted() { } else if !s.IsInterrupted() {
_, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced) _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced, "")
if err != nil { if err != nil {
*e = err *e = err
} }
@ -344,7 +346,7 @@ func (s *Sync) stopAndUploadWallet(e *error) {
err := s.uploadWallet() err := s.uploadWallet()
if err != nil { if err != nil {
if *e == nil { if *e == nil {
e = &err e = &err //not 100% sure
return return
} else { } else {
*e = errors.Prefix("failure uploading wallet: ", *e) *e = errors.Prefix("failure uploading wallet: ", *e)