Better tracking (Size and failure reason) #35

Merged
nikooo777 merged 8 commits from better-tracking into master 2018-09-26 22:26:49 +02:00
8 changed files with 239 additions and 119 deletions

View file

@ -456,3 +456,15 @@ func (d *Client) NumClaimsInChannel(url string) (uint64, error) {
} }
return channel.ClaimsInChannel, nil return channel.ClaimsInChannel, nil
} }
func (d *Client) ClaimListMine() (*ClaimListMineResponse, error) {
response := new(ClaimListMineResponse)
err := d.call(response, "claim_list_mine", map[string]interface{}{})
if err != nil {
return nil, err
} else if response == nil {
return nil, errors.Err("no response")
}
return response, nil
}

View file

@ -33,15 +33,22 @@ type Support struct {
type Claim struct { type Claim struct {
Address string `json:"address"` Address string `json:"address"`
Amount decimal.Decimal `json:"amount"` Amount decimal.Decimal `json:"amount"`
BlocksToExpiration int `json:"blocks_to_expiration"`
Category string `json:"category"`
ClaimID string `json:"claim_id"` ClaimID string `json:"claim_id"`
ClaimSequence int `json:"claim_sequence"` ClaimSequence int `json:"claim_sequence"`
Confirmations int `json:"confirmations"`
DecodedClaim bool `json:"decoded_claim"` DecodedClaim bool `json:"decoded_claim"`
Depth int `json:"depth"` Depth int `json:"depth"`
EffectiveAmount decimal.Decimal `json:"effective_amount"` EffectiveAmount decimal.Decimal `json:"effective_amount"`
ExpirationHeight int `json:"expiration_height"`
Expired bool `json:"expired"`
Height int `json:"height"` Height int `json:"height"`
Hex string `json:"hex"` Hex string `json:"hex"`
IsSpent bool `json:"is_spent"`
Name string `json:"name"` Name string `json:"name"`
Nout int `json:"nout"` Nout int `json:"nout"`
PermanentUrl string `json:"permanent_url"`
Supports []Support `json:"supports"` Supports []Support `json:"supports"`
Txid string `json:"txid"` Txid string `json:"txid"`
ValidAtHeight int `json:"valid_at_height"` ValidAtHeight int `json:"valid_at_height"`
@ -234,7 +241,7 @@ type ClaimListResponse struct {
LastTakeoverHeight int `json:"last_takeover_height"` LastTakeoverHeight int `json:"last_takeover_height"`
SupportsWithoutClaims []Support `json:"supports_without_claims"` SupportsWithoutClaims []Support `json:"supports_without_claims"`
} }
type ClaimListMineResponse []Claim
type ClaimShowResponse Claim type ClaimShowResponse Claim
type PeerListResponsePeer struct { type PeerListResponsePeer struct {

View file

@ -70,7 +70,7 @@ type apiYoutubeChannel struct {
SyncServer null.String `json:"sync_server"` SyncServer null.String `json:"sync_server"`
} }
func (s SyncManager) fetchChannels(status string) ([]apiYoutubeChannel, error) { func (s *SyncManager) fetchChannels(status string) ([]apiYoutubeChannel, error) {
endpoint := s.ApiURL + "/yt/jobs" endpoint := s.ApiURL + "/yt/jobs"
res, _ := http.PostForm(endpoint, url.Values{ res, _ := http.PostForm(endpoint, url.Values{
"auth_token": {s.ApiToken}, "auth_token": {s.ApiToken},
@ -105,35 +105,41 @@ type syncedVideo struct {
VideoID string `json:"video_id"` VideoID string `json:"video_id"`
Published bool `json:"published"` Published bool `json:"published"`
FailureReason string `json:"failure_reason"` FailureReason string `json:"failure_reason"`
ClaimName string `json:"claim_name"`
} }
func (s SyncManager) setChannelStatus(channelID string, status string) (map[string]syncedVideo, error) { func (s *SyncManager) setChannelStatus(channelID string, status string, failureReason string) (map[string]syncedVideo, map[string]bool, error) {
endpoint := s.ApiURL + "/yt/channel_status" endpoint := s.ApiURL + "/yt/channel_status"
if len(failureReason) > maxReasonLength {
failureReason = failureReason[:maxReasonLength]
}
res, _ := http.PostForm(endpoint, url.Values{ res, _ := http.PostForm(endpoint, url.Values{
"channel_id": {channelID}, "channel_id": {channelID},
"sync_server": {s.HostName}, "sync_server": {s.HostName},
"auth_token": {s.ApiToken}, "auth_token": {s.ApiToken},
"sync_status": {status}, "sync_status": {status},
"failure_reason": {failureReason},
}) })
defer res.Body.Close() defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body) body, _ := ioutil.ReadAll(res.Body)
var response apiChannelStatusResponse var response apiChannelStatusResponse
err := json.Unmarshal(body, &response) err := json.Unmarshal(body, &response)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
if !response.Error.IsNull() { if !response.Error.IsNull() {
return nil, errors.Err(response.Error.String) return nil, nil, errors.Err(response.Error.String)
} }
if response.Data != nil { if response.Data != nil {
svs := make(map[string]syncedVideo) svs := make(map[string]syncedVideo)
claimNames := make(map[string]bool)
for _, v := range response.Data { for _, v := range response.Data {
svs[v.VideoID] = v svs[v.VideoID] = v
claimNames[v.ClaimName] = v.Published
} }
return svs, nil return svs, claimNames, nil
} }
return nil, errors.Err("invalid API response. Status code: %d", res.StatusCode) return nil, nil, errors.Err("invalid API response. Status code: %d", res.StatusCode)
} }
const ( const (
@ -141,9 +147,11 @@ const (
VideoStatusFailed = "failed" VideoStatusFailed = "failed"
) )
func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string) error { func (s *SyncManager) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string, size *int64) error {
endpoint := s.ApiURL + "/yt/video_status" endpoint := s.ApiURL + "/yt/video_status"
if len(failureReason) > maxReasonLength {
failureReason = failureReason[:maxReasonLength]
}
vals := url.Values{ vals := url.Values{
"youtube_channel_id": {channelID}, "youtube_channel_id": {channelID},
"video_id": {videoID}, "video_id": {videoID},
@ -157,12 +165,11 @@ func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status st
vals.Add("published_at", strconv.FormatInt(time.Now().Unix(), 10)) vals.Add("published_at", strconv.FormatInt(time.Now().Unix(), 10))
vals.Add("claim_id", claimID) vals.Add("claim_id", claimID)
vals.Add("claim_name", claimName) vals.Add("claim_name", claimName)
if size != nil {
vals.Add("size", strconv.FormatInt(*size, 10))
}
} }
if failureReason != "" { if failureReason != "" {
maxReasonLength := 500
if len(failureReason) > maxReasonLength {
failureReason = failureReason[:500]
}
vals.Add("failure_reason", failureReason) vals.Add("failure_reason", failureReason)
} }
res, _ := http.PostForm(endpoint, vals) res, _ := http.PostForm(endpoint, vals)
@ -186,7 +193,7 @@ func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status st
return errors.Err("invalid API response. Status code: %d", res.StatusCode) return errors.Err("invalid API response. Status code: %d", res.StatusCode)
} }
func (s SyncManager) Start() error { func (s *SyncManager) Start() error {
syncCount := 0 syncCount := 0
for { for {
err := s.checkUsedSpace() err := s.checkUsedSpace()
@ -220,7 +227,7 @@ func (s SyncManager) Start() error {
ConcurrentVideos: s.ConcurrentVideos, ConcurrentVideos: s.ConcurrentVideos,
TakeOverExistingChannel: s.TakeOverExistingChannel, TakeOverExistingChannel: s.TakeOverExistingChannel,
Refill: s.Refill, Refill: s.Refill,
Manager: &s, Manager: s,
LbrycrdString: s.LbrycrdString, LbrycrdString: s.LbrycrdString,
AwsS3ID: s.AwsS3ID, AwsS3ID: s.AwsS3ID,
AwsS3Secret: s.AwsS3Secret, AwsS3Secret: s.AwsS3Secret,
@ -255,7 +262,7 @@ func (s SyncManager) Start() error {
ConcurrentVideos: s.ConcurrentVideos, ConcurrentVideos: s.ConcurrentVideos,
TakeOverExistingChannel: s.TakeOverExistingChannel, TakeOverExistingChannel: s.TakeOverExistingChannel,
Refill: s.Refill, Refill: s.Refill,
Manager: &s, Manager: s,
LbrycrdString: s.LbrycrdString, LbrycrdString: s.LbrycrdString,
AwsS3ID: s.AwsS3ID, AwsS3ID: s.AwsS3ID,
AwsS3Secret: s.AwsS3Secret, AwsS3Secret: s.AwsS3Secret,
@ -305,11 +312,11 @@ func (s SyncManager) Start() error {
return nil return nil
} }
func (s SyncManager) isWorthProcessing(channel apiYoutubeChannel) bool { func (s *SyncManager) isWorthProcessing(channel apiYoutubeChannel) bool {
return channel.TotalVideos > 0 && (channel.SyncServer.IsNull() || channel.SyncServer.String == s.HostName) return channel.TotalVideos > 0 && (channel.SyncServer.IsNull() || channel.SyncServer.String == s.HostName)
} }
func (s SyncManager) checkUsedSpace() error { func (s *SyncManager) checkUsedSpace() error {
usedPctile, err := GetUsedSpace(s.BlobsDir) usedPctile, err := GetUsedSpace(s.BlobsDir)
if err != nil { if err != nil {
return err return err

View file

@ -45,9 +45,9 @@ func (s *Sync) walletSetup() error {
return nil return nil
} }
s.syncedVideosMux.Lock() s.syncedVideosMux.RLock()
numPublished := len(s.syncedVideos) //should we only count published videos? Credits are allocated even for failed ones... numPublished := len(s.syncedVideos) //should we only count published videos? Credits are allocated even for failed ones...
s.syncedVideosMux.Unlock() s.syncedVideosMux.RUnlock()
log.Debugf("We already published %d videos", numPublished) log.Debugf("We already published %d videos", numPublished)
if numOnSource-numPublished > s.Manager.VideosLimit { if numOnSource-numPublished > s.Manager.VideosLimit {

View file

@ -49,22 +49,22 @@ func getClaimNameFromTitle(title string, attempt int) string {
return name + suffix return name + suffix
} }
var publishedNamesMutex sync.RWMutex func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) {
var publishedNames = map[string]bool{}
func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions) (*SyncSummary, error) {
attempt := 0 attempt := 0
for { for {
attempt++ attempt++
name := getClaimNameFromTitle(title, attempt) name := getClaimNameFromTitle(title, attempt)
publishedNamesMutex.RLock() syncedVideosMux.Lock()
_, exists := publishedNames[name] _, exists := claimNames[name]
publishedNamesMutex.RUnlock()
if exists { if exists {
log.Printf("name exists, retrying (%d attempts so far)\n", attempt) log.Printf("name exists, retrying (%d attempts so far)", attempt)
syncedVideosMux.Unlock()
continue continue
} }
claimNames[name] = false
syncedVideosMux.Unlock()
//if for some reasons the title can't be converted in a valid claim name (too short or not latin) then we use a hash //if for some reasons the title can't be converted in a valid claim name (too short or not latin) then we use a hash
if len(name) < 2 { if len(name) < 2 {
hasher := md5.New() hasher := md5.New()
@ -74,13 +74,13 @@ func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string
response, err := daemon.Publish(name, filename, amount, options) response, err := daemon.Publish(name, filename, amount, options)
if err == nil || strings.Contains(err.Error(), "failed: Multiple claims (") { if err == nil || strings.Contains(err.Error(), "failed: Multiple claims (") {
publishedNamesMutex.Lock() syncedVideosMux.Lock()
publishedNames[name] = true claimNames[name] = true
publishedNamesMutex.Unlock() syncedVideosMux.Unlock()
if err == nil { if err == nil {
return &SyncSummary{ClaimID: response.ClaimID, ClaimName: name}, nil return &SyncSummary{ClaimID: response.ClaimID, ClaimName: name}, nil
} else { } else {
log.Printf("name exists, retrying (%d attempts so far)\n", attempt) log.Printf("name exists, retrying (%d attempts so far)", attempt)
continue continue
} }
} else { } else {

View file

@ -8,6 +8,8 @@ import (
"strings" "strings"
"time" "time"
"sync"
"github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/aws/aws-sdk-go/service/s3/s3manager"
@ -26,11 +28,13 @@ type ucbVideo struct {
description string description string
publishedAt time.Time publishedAt time.Time
dir string dir string
claimNames map[string]bool
syncedVideosMux *sync.RWMutex
} }
func NewUCBVideo(id, title, channel, description, publishedAt, dir string) ucbVideo { func NewUCBVideo(id, title, channel, description, publishedAt, dir string) *ucbVideo {
p, _ := time.Parse(time.RFC3339Nano, publishedAt) // ignore parse errors p, _ := time.Parse(time.RFC3339Nano, publishedAt) // ignore parse errors
return ucbVideo{ return &ucbVideo{
id: id, id: id,
title: title, title: title,
description: description, description: description,
@ -40,19 +44,19 @@ func NewUCBVideo(id, title, channel, description, publishedAt, dir string) ucbVi
} }
} }
lyoshenka commented 2018-08-27 19:59:17 +02:00 (Migrated from github.com)
Review

why are these fields here if they never get used?

why are these fields here if they never get used?
lyoshenka commented 2018-08-27 20:00:08 +02:00 (Migrated from github.com)
Review

and having syncedVideosMux on each video is weird. whats going on?

and having syncedVideosMux on each video is weird. whats going on?
nikooo777 commented 2018-08-27 20:12:54 +02:00 (Migrated from github.com)
Review

there is only one mutex for the whole map, the pattern is the same you used locally in the function where a name is generated for the claim.
I however agree with your comment about passing down the mutex, I'm going to correct that.

there is only one mutex for the whole map, the pattern is the same you used locally in the function where a name is generated for the claim. I however agree with your comment about passing down the mutex, I'm going to correct that.
func (v ucbVideo) ID() string { func (v *ucbVideo) ID() string {
return v.id return v.id
} }
func (v ucbVideo) PlaylistPosition() int { func (v *ucbVideo) PlaylistPosition() int {
return 0 return 0
} }
func (v ucbVideo) IDAndNum() string { func (v *ucbVideo) IDAndNum() string {
return v.ID() + " (?)" return v.ID() + " (?)"
} }
func (v ucbVideo) PublishedAt() time.Time { func (v *ucbVideo) PublishedAt() time.Time {
return v.publishedAt return v.publishedAt
//r := regexp.MustCompile(`(\d\d\d\d)-(\d\d)-(\d\d)`) //r := regexp.MustCompile(`(\d\d\d\d)-(\d\d)-(\d\d)`)
//matches := r.FindStringSubmatch(v.title) //matches := r.FindStringSubmatch(v.title)
@ -65,11 +69,11 @@ func (v ucbVideo) PublishedAt() time.Time {
//return time.Now() //return time.Now()
} }
func (v ucbVideo) getFilename() string { func (v *ucbVideo) getFilename() string {
return v.dir + "/" + v.id + ".mp4" return v.dir + "/" + v.id + ".mp4"
} }
func (v ucbVideo) getClaimName(attempt int) string { func (v *ucbVideo) getClaimName(attempt int) string {
reg := regexp.MustCompile(`[^a-zA-Z0-9]+`) reg := regexp.MustCompile(`[^a-zA-Z0-9]+`)
suffix := "" suffix := ""
if attempt > 1 { if attempt > 1 {
@ -98,7 +102,7 @@ func (v ucbVideo) getClaimName(attempt int) string {
return name + suffix return name + suffix
} }
func (v ucbVideo) getAbbrevDescription() string { func (v *ucbVideo) getAbbrevDescription() string {
maxLines := 10 maxLines := 10
description := strings.TrimSpace(v.description) description := strings.TrimSpace(v.description)
if strings.Count(description, "\n") < maxLines { if strings.Count(description, "\n") < maxLines {
@ -107,7 +111,7 @@ func (v ucbVideo) getAbbrevDescription() string {
return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..." return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..."
} }
func (v ucbVideo) download() error { func (v *ucbVideo) download() error {
videoPath := v.getFilename() videoPath := v.getFilename()
_, err := os.Stat(videoPath) _, err := os.Stat(videoPath)
@ -146,7 +150,7 @@ func (v ucbVideo) download() error {
return nil return nil
} }
func (v ucbVideo) saveThumbnail() error { func (v *ucbVideo) saveThumbnail() error {
resp, err := http.Get("https://s3.us-east-2.amazonaws.com/lbry-niko2/thumbnails/" + v.id) resp, err := http.Get("https://s3.us-east-2.amazonaws.com/lbry-niko2/thumbnails/" + v.id)
if err != nil { if err != nil {
return err return err
@ -170,7 +174,7 @@ func (v ucbVideo) saveThumbnail() error {
return err return err
} }
func (v ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) { func (v *ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) {
options := jsonrpc.PublishOptions{ options := jsonrpc.PublishOptions{
Title: &v.title, Title: &v.title,
Author: strPtr("UC Berkeley"), Author: strPtr("UC Berkeley"),
@ -183,10 +187,16 @@ func (v ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount fl
ChangeAddress: &claimAddress, ChangeAddress: &claimAddress,
} }
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options) return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, v.claimNames, v.syncedVideosMux)
} }
func (v ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) { func (v *ucbVideo) Size() *int64 {
return nil
}
func (v *ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) {
v.claimNames = claimNames
v.syncedVideosMux = syncedVideosMux
//download and thumbnail can be done in parallel //download and thumbnail can be done in parallel
err := v.download() err := v.download()
if err != nil { if err != nil {

View file

@ -11,6 +11,8 @@ import (
"strings" "strings"
"time" "time"
"sync"
"github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/jsonrpc" "github.com/lbryio/lbry.go/jsonrpc"
@ -25,13 +27,16 @@ type YoutubeVideo struct {
title string title string
description string description string
playlistPosition int64 playlistPosition int64
size *int64
publishedAt time.Time publishedAt time.Time
dir string dir string
claimNames map[string]bool
syncedVideosMux *sync.RWMutex
} }
func NewYoutubeVideo(directory string, snippet *youtube.PlaylistItemSnippet) YoutubeVideo { func NewYoutubeVideo(directory string, snippet *youtube.PlaylistItemSnippet) *YoutubeVideo {
publishedAt, _ := time.Parse(time.RFC3339Nano, snippet.PublishedAt) // ignore parse errors publishedAt, _ := time.Parse(time.RFC3339Nano, snippet.PublishedAt) // ignore parse errors
return YoutubeVideo{ return &YoutubeVideo{
id: snippet.ResourceId.VideoId, id: snippet.ResourceId.VideoId,
title: snippet.Title, title: snippet.Title,
description: snippet.Description, description: snippet.Description,
@ -42,23 +47,23 @@ func NewYoutubeVideo(directory string, snippet *youtube.PlaylistItemSnippet) You
} }
} }
func (v YoutubeVideo) ID() string { func (v *YoutubeVideo) ID() string {
return v.id return v.id
} }
func (v YoutubeVideo) PlaylistPosition() int { func (v *YoutubeVideo) PlaylistPosition() int {
return int(v.playlistPosition) return int(v.playlistPosition)
} }
func (v YoutubeVideo) IDAndNum() string { func (v *YoutubeVideo) IDAndNum() string {
return v.ID() + " (" + strconv.Itoa(int(v.playlistPosition)) + " in channel)" return v.ID() + " (" + strconv.Itoa(int(v.playlistPosition)) + " in channel)"
} }
func (v YoutubeVideo) PublishedAt() time.Time { func (v *YoutubeVideo) PublishedAt() time.Time {
return v.publishedAt return v.publishedAt
} }
func (v YoutubeVideo) getFilename() string { func (v *YoutubeVideo) getFilename() string {
maxLen := 30 maxLen := 30
reg := regexp.MustCompile(`[^a-zA-Z0-9]+`) reg := regexp.MustCompile(`[^a-zA-Z0-9]+`)
@ -85,7 +90,7 @@ func (v YoutubeVideo) getFilename() string {
return v.videoDir() + "/" + name + ".mp4" return v.videoDir() + "/" + name + ".mp4"
} }
func (v YoutubeVideo) getAbbrevDescription() string { func (v *YoutubeVideo) getAbbrevDescription() string {
maxLines := 10 maxLines := 10
description := strings.TrimSpace(v.description) description := strings.TrimSpace(v.description)
if strings.Count(description, "\n") < maxLines { if strings.Count(description, "\n") < maxLines {
@ -94,7 +99,7 @@ func (v YoutubeVideo) getAbbrevDescription() string {
return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..." return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..."
} }
func (v YoutubeVideo) download() error { func (v *YoutubeVideo) download() error {
videoPath := v.getFilename() videoPath := v.getFilename()
err := os.Mkdir(v.videoDir(), 0750) err := os.Mkdir(v.videoDir(), 0750)
@ -118,20 +123,19 @@ func (v YoutubeVideo) download() error {
var downloadedFile *os.File var downloadedFile *os.File
downloadedFile, err = os.Create(videoPath) downloadedFile, err = os.Create(videoPath)
defer downloadedFile.Close()
if err != nil { if err != nil {
return err return err
} }
defer downloadedFile.Close() return videoInfo.Download(videoInfo.Formats.Best(ytdl.FormatAudioEncodingKey)[1], downloadedFile)
return videoInfo.Download(videoInfo.Formats.Best(ytdl.FormatAudioEncodingKey)[0], downloadedFile)
} }
func (v YoutubeVideo) videoDir() string { func (v *YoutubeVideo) videoDir() string {
return v.dir + "/" + v.id return v.dir + "/" + v.id
} }
func (v YoutubeVideo) delete() error { func (v *YoutubeVideo) delete() error {
videoPath := v.getFilename() videoPath := v.getFilename()
err := os.Remove(videoPath) err := os.Remove(videoPath)
if err != nil { if err != nil {
@ -142,7 +146,7 @@ func (v YoutubeVideo) delete() error {
return nil return nil
} }
func (v YoutubeVideo) triggerThumbnailSave() error { func (v *YoutubeVideo) triggerThumbnailSave() error {
client := &http.Client{Timeout: 30 * time.Second} client := &http.Client{Timeout: 30 * time.Second}
params, err := json.Marshal(map[string]string{"videoid": v.id}) params, err := json.Marshal(map[string]string{"videoid": v.id})
@ -167,17 +171,17 @@ func (v YoutubeVideo) triggerThumbnailSave() error {
} }
var decoded struct { var decoded struct {
error int `json:"error"` Error int `json:"error"`
url string `json:"url,omitempty"` Url string `json:"url,omitempty"`
message string `json:"message,omitempty"` Message string `json:"message,omitempty"`
} }
err = json.Unmarshal(contents, &decoded) err = json.Unmarshal(contents, &decoded)
if err != nil { if err != nil {
return err return err
} }
if decoded.error != 0 { if decoded.Error != 0 {
return errors.Err("error creating thumbnail: " + decoded.message) return errors.Err("error creating thumbnail: " + decoded.Message)
} }
return nil return nil
@ -185,7 +189,7 @@ func (v YoutubeVideo) triggerThumbnailSave() error {
func strPtr(s string) *string { return &s } func strPtr(s string) *string { return &s }
func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) { func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) {
if channelID == "" { if channelID == "" {
return nil, errors.Err("a claim_id for the channel wasn't provided") //TODO: this is probably not needed? return nil, errors.Err("a claim_id for the channel wasn't provided") //TODO: this is probably not needed?
} }
@ -200,10 +204,16 @@ func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amoun
ChangeAddress: &claimAddress, ChangeAddress: &claimAddress,
ChannelID: &channelID, ChannelID: &channelID,
} }
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options) return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, v.claimNames, v.syncedVideosMux)
} }
func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) { func (v *YoutubeVideo) Size() *int64 {
return v.size
}
func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) {
v.claimNames = claimNames
v.syncedVideosMux = syncedVideosMux
//download and thumbnail can be done in parallel //download and thumbnail can be done in parallel
err := v.download() err := v.download()
if err != nil { if err != nil {
@ -215,7 +225,10 @@ func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount f
if err != nil { if err != nil {
return nil, err return nil, err
} }
if fi.Size() > int64(maxVideoSize)*1024*1024 { videoSize := fi.Size()
v.size = &videoSize
if videoSize > int64(maxVideoSize)*1024*1024 {
//delete the video and ignore the error //delete the video and ignore the error
_ = v.delete() _ = v.delete()
return nil, errors.Err("the video is too big to sync, skipping for now") return nil, errors.Err("the video is too big to sync, skipping for now")

View file

@ -38,14 +38,16 @@ import (
const ( const (
channelClaimAmount = 0.01 channelClaimAmount = 0.01
publishAmount = 0.01 publishAmount = 0.01
maxReasonLength = 500
) )
type video interface { type video interface {
Size() *int64
ID() string ID() string
IDAndNum() string IDAndNum() string
PlaylistPosition() int PlaylistPosition() int
PublishedAt() time.Time PublishedAt() time.Time
Sync(*jsonrpc.Client, string, float64, string, int) (*sources.SyncSummary, error) Sync(*jsonrpc.Client, string, float64, string, int, map[string]bool, *sync.RWMutex) (*sources.SyncSummary, error)
} }
// sorting videos // sorting videos
@ -76,8 +78,9 @@ type Sync struct {
claimAddress string claimAddress string
videoDirectory string videoDirectory string
db *redisdb.DB db *redisdb.DB
syncedVideosMux *sync.RWMutex
syncedVideos map[string]syncedVideo syncedVideos map[string]syncedVideo
syncedVideosMux *sync.Mutex claimNames map[string]bool
grp *stop.Group grp *stop.Group
lbryChannelID string lbryChannelID string
@ -85,7 +88,7 @@ type Sync struct {
queue chan video queue chan video
} }
func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string) { func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string, claimName string) {
s.syncedVideosMux.Lock() s.syncedVideosMux.Lock()
defer s.syncedVideosMux.Unlock() defer s.syncedVideosMux.Unlock()
s.syncedVideos[videoID] = syncedVideo{ s.syncedVideos[videoID] = syncedVideo{
@ -93,6 +96,9 @@ func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason s
Published: published, Published: published,
FailureReason: failureReason, FailureReason: failureReason,
} }
if claimName != "" {
s.claimNames[claimName] = true
}
} }
// SendErrorToSlack Sends an error message to the default channel and to the process log. // SendErrorToSlack Sends an error message to the default channel and to the process log.
@ -223,7 +229,7 @@ func (s *Sync) FullCycle() (e error) {
if s.YoutubeChannelID == "" { if s.YoutubeChannelID == "" {
return errors.Err("channel ID not provided") return errors.Err("channel ID not provided")
} }
s.syncedVideosMux = &sync.Mutex{} s.syncedVideosMux = &sync.RWMutex{}
s.walletMux = &sync.Mutex{} s.walletMux = &sync.Mutex{}
s.db = redisdb.New() s.db = redisdb.New()
s.grp = stop.New() s.grp = stop.New()
@ -236,12 +242,13 @@ func (s *Sync) FullCycle() (e error) {
log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)") log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)")
s.grp.Stop() s.grp.Stop()
}() }()
syncedVideos, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing) syncedVideos, claimNames, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing, "")
if err != nil { if err != nil {
return err return err
} }
s.syncedVideosMux.Lock() s.syncedVideosMux.Lock()
s.syncedVideos = syncedVideos s.syncedVideos = syncedVideos
s.claimNames = claimNames
s.syncedVideosMux.Unlock() s.syncedVideosMux.Unlock()
defer s.updateChannelStatus(&e) defer s.updateChannelStatus(&e)
@ -298,14 +305,15 @@ func (s *Sync) updateChannelStatus(e *error) {
if util.SubstringInSlice((*e).Error(), noFailConditions) { if util.SubstringInSlice((*e).Error(), noFailConditions) {
return return
} }
_, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed) failureReason := (*e).Error()
_, _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed, failureReason)
if err != nil { if err != nil {
msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName) msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName)
err = errors.Prefix(msg, err) err = errors.Prefix(msg, err)
*e = errors.Prefix(err.Error(), *e) *e = errors.Prefix(err.Error(), *e)
} }
} else if !s.IsInterrupted() { } else if !s.IsInterrupted() {
_, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced) _, _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced, "")
if err != nil { if err != nil {
*e = err *e = err
} }
@ -343,7 +351,7 @@ func (s *Sync) stopAndUploadWallet(e *error) {
err := s.uploadWallet() err := s.uploadWallet()
if err != nil { if err != nil {
if *e == nil { if *e == nil {
e = &err e = &err //not 100% sure
return return
} else { } else {
*e = errors.Prefix("failure uploading wallet: ", *e) *e = errors.Prefix("failure uploading wallet: ", *e)
@ -357,9 +365,71 @@ func logShutdownError(shutdownErr error) {
SendErrorToSlack("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR") SendErrorToSlack("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR")
} }
func hasDupes(claims []jsonrpc.Claim) (bool, error) {
videoIDs := make(map[string]interface{})
for _, c := range claims {
if !util.InSlice(c.Category, []string{"claim", "update"}) || c.Value.Stream == nil {
continue
}
if c.Value.Stream.Metadata == nil || c.Value.Stream.Metadata.Thumbnail == nil {
return false, errors.Err("something is wrong with the this claim: %s", c.ClaimID)
}
tn := *c.Value.Stream.Metadata.Thumbnail
videoID := tn[:strings.LastIndex(tn, "/")+1]
_, ok := videoIDs[videoID]
if !ok {
videoIDs[videoID] = nil
continue
}
return true, nil
}
return false, nil
}
//publishesCount counts the amount of videos published so far
func publishesCount(claims []jsonrpc.Claim) (int, error) {
count := 0
for _, c := range claims {
if !util.InSlice(c.Category, []string{"claim", "update"}) || c.Value.Stream == nil {
continue
}
if c.Value.Stream.Metadata == nil || c.Value.Stream.Metadata.Thumbnail == nil {
return count, errors.Err("something is wrong with the this claim: %s", c.ClaimID)
}
count++
}
return count, nil
}
func (s *Sync) doSync() error { func (s *Sync) doSync() error {
var err error var err error
claims, err := s.daemon.ClaimListMine()
if err != nil {
return errors.Prefix("cannot list claims: ", err)
}
hasDupes, err := hasDupes(*claims)
if err != nil {
return errors.Prefix("error checking for duplicates: ", err)
}
if hasDupes {
return errors.Err("channel has duplicates! Manual fix required")
}
pubsOnWallet, err := publishesCount(*claims)
if err != nil {
return errors.Prefix("error counting claims: ", err)
}
pubsOnDB := 0
for _, sv := range s.syncedVideos {
if sv.Published {
pubsOnDB++
}
}
if pubsOnWallet > pubsOnDB {
return errors.Err("not all published videos are in the database")
}
if pubsOnWallet < pubsOnDB {
SendInfoToSlack("We're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID)
}
err = s.walletSetup() err = s.walletSetup()
if err != nil { if err != nil {
return errors.Prefix("Initial wallet setup failed! Manual Intervention is required.", err) return errors.Prefix("Initial wallet setup failed! Manual Intervention is required.", err)
@ -371,10 +441,10 @@ func (s *Sync) doSync() error {
for i := 0; i < s.ConcurrentVideos; i++ { for i := 0; i < s.ConcurrentVideos; i++ {
s.grp.Add(1) s.grp.Add(1)
go func() { go func(i int) {
defer s.grp.Done() defer s.grp.Done()
lyoshenka commented 2018-08-27 20:01:24 +02:00 (Migrated from github.com)
Review

why does this function return a pointer to an error?

why does this function return a pointer to an error?
nikooo777 commented 2018-08-27 20:11:34 +02:00 (Migrated from github.com)
Review

because the function is run as a deferred and needs to read/write to the error being returned by the original function. Is this a bad pattern?

because the function is run as a deferred and needs to read/write to the error being returned by the original function. Is this a bad pattern?
nikooo777 commented 2018-08-27 20:18:36 +02:00 (Migrated from github.com)
Review

well actually to be correct it's not returning a pointer to an error, it's accepting a pointer to an error so that I can change it from within.

well actually to be correct it's not returning a pointer to an error, it's accepting a pointer to an error so that I can change it from within.
s.startWorker(i) s.startWorker(i)
}() }(i)
} }
if s.LbryChannelName == "@UCBerkeley" { if s.LbryChannelName == "@UCBerkeley" {
@ -469,8 +539,8 @@ func (s *Sync) startWorker(workerNum int) {
} }
SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg) SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg)
} }
s.AppendSyncedVideo(v.ID(), false, err.Error()) s.AppendSyncedVideo(v.ID(), false, err.Error(), "")
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error()) err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error(), v.Size())
if err != nil { if err != nil {
SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
} }
@ -627,9 +697,9 @@ func (s *Sync) processVideo(v video) (err error) {
log.Println(v.ID() + " took " + time.Since(start).String()) log.Println(v.ID() + " took " + time.Since(start).String())
}(time.Now()) }(time.Now())
s.syncedVideosMux.Lock() s.syncedVideosMux.RLock()
sv, ok := s.syncedVideos[v.ID()] sv, ok := s.syncedVideos[v.ID()]
s.syncedVideosMux.Unlock() s.syncedVideosMux.RUnlock()
alreadyPublished := ok && sv.Published alreadyPublished := ok && sv.Published
neverRetryFailures := []string{ neverRetryFailures := []string{
@ -667,15 +737,16 @@ func (s *Sync) processVideo(v video) (err error) {
if err != nil { if err != nil {
return err return err
} }
summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.MaxVideoSize) summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.MaxVideoSize, s.claimNames, s.syncedVideosMux)
if err != nil { if err != nil {
return err return err
} }
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "") err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "", v.Size())
if err != nil { if err != nil {
return err SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
} }
s.AppendSyncedVideo(v.ID(), true, "")
s.AppendSyncedVideo(v.ID(), true, "", summary.ClaimName)
return nil return nil
} }