Better tracking (Size and failure reason) #35
6 changed files with 59 additions and 41 deletions
|
@ -105,9 +105,10 @@ type syncedVideo struct {
|
||||||
VideoID string `json:"video_id"`
|
VideoID string `json:"video_id"`
|
||||||
Published bool `json:"published"`
|
Published bool `json:"published"`
|
||||||
FailureReason string `json:"failure_reason"`
|
FailureReason string `json:"failure_reason"`
|
||||||
|
ClaimName string `json:"claim_name"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SyncManager) setChannelStatus(channelID string, status string, failureReason string) (map[string]syncedVideo, error) {
|
func (s *SyncManager) setChannelStatus(channelID string, status string, failureReason string) (map[string]syncedVideo, map[string]bool, error) {
|
||||||
endpoint := s.ApiURL + "/yt/channel_status"
|
endpoint := s.ApiURL + "/yt/channel_status"
|
||||||
if len(failureReason) > maxReasonLength {
|
if len(failureReason) > maxReasonLength {
|
||||||
failureReason = failureReason[:maxReasonLength]
|
failureReason = failureReason[:maxReasonLength]
|
||||||
|
@ -124,19 +125,21 @@ func (s *SyncManager) setChannelStatus(channelID string, status string, failureR
|
||||||
var response apiChannelStatusResponse
|
var response apiChannelStatusResponse
|
||||||
err := json.Unmarshal(body, &response)
|
err := json.Unmarshal(body, &response)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
if !response.Error.IsNull() {
|
if !response.Error.IsNull() {
|
||||||
return nil, errors.Err(response.Error.String)
|
return nil, nil, errors.Err(response.Error.String)
|
||||||
}
|
}
|
||||||
if response.Data != nil {
|
if response.Data != nil {
|
||||||
svs := make(map[string]syncedVideo)
|
svs := make(map[string]syncedVideo)
|
||||||
|
claimNames := make(map[string]bool)
|
||||||
for _, v := range response.Data {
|
for _, v := range response.Data {
|
||||||
svs[v.VideoID] = v
|
svs[v.VideoID] = v
|
||||||
|
claimNames[v.ClaimName] = v.Published
|
||||||
}
|
}
|
||||||
return svs, nil
|
return svs, claimNames, nil
|
||||||
}
|
}
|
||||||
return nil, errors.Err("invalid API response. Status code: %d", res.StatusCode)
|
return nil, nil, errors.Err("invalid API response. Status code: %d", res.StatusCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|
|
@ -45,9 +45,9 @@ func (s *Sync) walletSetup() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
s.syncedVideosMux.Lock()
|
s.syncedVideosMux.RLock()
|
||||||
numPublished := len(s.syncedVideos) //should we only count published videos? Credits are allocated even for failed ones...
|
numPublished := len(s.syncedVideos) //should we only count published videos? Credits are allocated even for failed ones...
|
||||||
s.syncedVideosMux.Unlock()
|
s.syncedVideosMux.RUnlock()
|
||||||
log.Debugf("We already published %d videos", numPublished)
|
log.Debugf("We already published %d videos", numPublished)
|
||||||
|
|
||||||
if numOnSource-numPublished > s.Manager.VideosLimit {
|
if numOnSource-numPublished > s.Manager.VideosLimit {
|
||||||
|
|
|
@ -49,22 +49,22 @@ func getClaimNameFromTitle(title string, attempt int) string {
|
||||||
return name + suffix
|
return name + suffix
|
||||||
}
|
}
|
||||||
|
|
||||||
var publishedNamesMutex sync.RWMutex
|
func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) {
|
||||||
var publishedNames = map[string]bool{}
|
|
||||||
|
|
||||||
func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions) (*SyncSummary, error) {
|
|
||||||
attempt := 0
|
attempt := 0
|
||||||
for {
|
for {
|
||||||
attempt++
|
attempt++
|
||||||
name := getClaimNameFromTitle(title, attempt)
|
name := getClaimNameFromTitle(title, attempt)
|
||||||
|
|
||||||
publishedNamesMutex.RLock()
|
syncedVideosMux.Lock()
|
||||||
_, exists := publishedNames[name]
|
_, exists := claimNames[name]
|
||||||
publishedNamesMutex.RUnlock()
|
|
||||||
if exists {
|
if exists {
|
||||||
log.Printf("name exists, retrying (%d attempts so far)\n", attempt)
|
log.Printf("name exists, retrying (%d attempts so far)", attempt)
|
||||||
|
syncedVideosMux.Unlock()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
claimNames[name] = false
|
||||||
|
syncedVideosMux.Unlock()
|
||||||
|
|
||||||
//if for some reasons the title can't be converted in a valid claim name (too short or not latin) then we use a hash
|
//if for some reasons the title can't be converted in a valid claim name (too short or not latin) then we use a hash
|
||||||
if len(name) < 2 {
|
if len(name) < 2 {
|
||||||
hasher := md5.New()
|
hasher := md5.New()
|
||||||
|
@ -74,13 +74,13 @@ func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string
|
||||||
|
|
||||||
response, err := daemon.Publish(name, filename, amount, options)
|
response, err := daemon.Publish(name, filename, amount, options)
|
||||||
if err == nil || strings.Contains(err.Error(), "failed: Multiple claims (") {
|
if err == nil || strings.Contains(err.Error(), "failed: Multiple claims (") {
|
||||||
publishedNamesMutex.Lock()
|
syncedVideosMux.Lock()
|
||||||
publishedNames[name] = true
|
claimNames[name] = true
|
||||||
publishedNamesMutex.Unlock()
|
syncedVideosMux.Unlock()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return &SyncSummary{ClaimID: response.ClaimID, ClaimName: name}, nil
|
return &SyncSummary{ClaimID: response.ClaimID, ClaimName: name}, nil
|
||||||
} else {
|
} else {
|
||||||
log.Printf("name exists, retrying (%d attempts so far)\n", attempt)
|
log.Printf("name exists, retrying (%d attempts so far)", attempt)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -8,6 +8,8 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/aws/aws-sdk-go/aws/credentials"
|
"github.com/aws/aws-sdk-go/aws/credentials"
|
||||||
"github.com/aws/aws-sdk-go/service/s3"
|
"github.com/aws/aws-sdk-go/service/s3"
|
||||||
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
||||||
|
@ -20,12 +22,14 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type ucbVideo struct {
|
type ucbVideo struct {
|
||||||
id string
|
id string
|
||||||
title string
|
title string
|
||||||
channel string
|
channel string
|
||||||
description string
|
description string
|
||||||
publishedAt time.Time
|
publishedAt time.Time
|
||||||
dir string
|
dir string
|
||||||
|
claimNames map[string]bool
|
||||||
|
syncedVideosMux *sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewUCBVideo(id, title, channel, description, publishedAt, dir string) *ucbVideo {
|
func NewUCBVideo(id, title, channel, description, publishedAt, dir string) *ucbVideo {
|
||||||
|
@ -183,14 +187,16 @@ func (v *ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount f
|
||||||
ChangeAddress: &claimAddress,
|
ChangeAddress: &claimAddress,
|
||||||
}
|
}
|
||||||
|
|
||||||
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options)
|
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, v.claimNames, v.syncedVideosMux)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *ucbVideo) Size() *int64 {
|
func (v *ucbVideo) Size() *int64 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) {
|
func (v *ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) {
|
||||||
|
v.claimNames = claimNames
|
||||||
|
v.syncedVideosMux = syncedVideosMux
|
||||||
//download and thumbnail can be done in parallel
|
//download and thumbnail can be done in parallel
|
||||||
err := v.download()
|
err := v.download()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -11,6 +11,8 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/errors"
|
"github.com/lbryio/lbry.go/errors"
|
||||||
"github.com/lbryio/lbry.go/jsonrpc"
|
"github.com/lbryio/lbry.go/jsonrpc"
|
||||||
|
|
||||||
|
@ -28,6 +30,8 @@ type YoutubeVideo struct {
|
||||||
size *int64
|
size *int64
|
||||||
publishedAt time.Time
|
publishedAt time.Time
|
||||||
dir string
|
dir string
|
||||||
|
claimNames map[string]bool
|
||||||
|
syncedVideosMux *sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewYoutubeVideo(directory string, snippet *youtube.PlaylistItemSnippet) *YoutubeVideo {
|
func NewYoutubeVideo(directory string, snippet *youtube.PlaylistItemSnippet) *YoutubeVideo {
|
||||||
|
@ -201,14 +205,16 @@ func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amou
|
||||||
ChangeAddress: &claimAddress,
|
ChangeAddress: &claimAddress,
|
||||||
ChannelID: &channelID,
|
ChannelID: &channelID,
|
||||||
}
|
}
|
||||||
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options)
|
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, v.claimNames, v.syncedVideosMux)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *YoutubeVideo) Size() *int64 {
|
func (v *YoutubeVideo) Size() *int64 {
|
||||||
return v.size
|
return v.size
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) {
|
func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) {
|
||||||
|
v.claimNames = claimNames
|
||||||
|
v.syncedVideosMux = syncedVideosMux
|
||||||
//download and thumbnail can be done in parallel
|
//download and thumbnail can be done in parallel
|
||||||
err := v.download()
|
err := v.download()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -47,7 +47,7 @@ type video interface {
|
||||||
IDAndNum() string
|
IDAndNum() string
|
||||||
PlaylistPosition() int
|
PlaylistPosition() int
|
||||||
PublishedAt() time.Time
|
PublishedAt() time.Time
|
||||||
Sync(*jsonrpc.Client, string, float64, string, int) (*sources.SyncSummary, error)
|
Sync(*jsonrpc.Client, string, float64, string, int, map[string]bool, *sync.RWMutex) (*sources.SyncSummary, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// sorting videos
|
// sorting videos
|
||||||
|
@ -78,8 +78,9 @@ type Sync struct {
|
||||||
claimAddress string
|
claimAddress string
|
||||||
videoDirectory string
|
videoDirectory string
|
||||||
db *redisdb.DB
|
db *redisdb.DB
|
||||||
|
syncedVideosMux *sync.RWMutex
|
||||||
syncedVideos map[string]syncedVideo
|
syncedVideos map[string]syncedVideo
|
||||||
syncedVideosMux *sync.Mutex
|
claimNames map[string]bool
|
||||||
grp *stop.Group
|
grp *stop.Group
|
||||||
lbryChannelID string
|
lbryChannelID string
|
||||||
|
|
||||||
|
@ -87,7 +88,7 @@ type Sync struct {
|
||||||
queue chan video
|
queue chan video
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string) {
|
func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string, claimName string) {
|
||||||
s.syncedVideosMux.Lock()
|
s.syncedVideosMux.Lock()
|
||||||
defer s.syncedVideosMux.Unlock()
|
defer s.syncedVideosMux.Unlock()
|
||||||
s.syncedVideos[videoID] = syncedVideo{
|
s.syncedVideos[videoID] = syncedVideo{
|
||||||
|
@ -95,6 +96,7 @@ func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason s
|
||||||
Published: published,
|
Published: published,
|
||||||
FailureReason: failureReason,
|
FailureReason: failureReason,
|
||||||
}
|
}
|
||||||
|
s.claimNames[claimName] = true
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendErrorToSlack Sends an error message to the default channel and to the process log.
|
// SendErrorToSlack Sends an error message to the default channel and to the process log.
|
||||||
|
@ -225,7 +227,7 @@ func (s *Sync) FullCycle() (e error) {
|
||||||
if s.YoutubeChannelID == "" {
|
if s.YoutubeChannelID == "" {
|
||||||
return errors.Err("channel ID not provided")
|
return errors.Err("channel ID not provided")
|
||||||
}
|
}
|
||||||
s.syncedVideosMux = &sync.Mutex{}
|
s.syncedVideosMux = &sync.RWMutex{}
|
||||||
s.walletMux = &sync.Mutex{}
|
s.walletMux = &sync.Mutex{}
|
||||||
s.db = redisdb.New()
|
s.db = redisdb.New()
|
||||||
s.grp = stop.New()
|
s.grp = stop.New()
|
||||||
|
@ -238,12 +240,13 @@ func (s *Sync) FullCycle() (e error) {
|
||||||
log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)")
|
log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)")
|
||||||
s.grp.Stop()
|
s.grp.Stop()
|
||||||
}()
|
}()
|
||||||
syncedVideos, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing, "")
|
syncedVideos, claimNames, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
s.syncedVideosMux.Lock()
|
s.syncedVideosMux.Lock()
|
||||||
s.syncedVideos = syncedVideos
|
s.syncedVideos = syncedVideos
|
||||||
|
s.claimNames = claimNames
|
||||||
s.syncedVideosMux.Unlock()
|
s.syncedVideosMux.Unlock()
|
||||||
|
|
||||||
defer s.updateChannelStatus(&e)
|
defer s.updateChannelStatus(&e)
|
||||||
|
@ -301,14 +304,14 @@ func (s *Sync) updateChannelStatus(e *error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
failureReason := (*e).Error()
|
failureReason := (*e).Error()
|
||||||
_, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed, failureReason)
|
_, _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed, failureReason)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName)
|
msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName)
|
||||||
err = errors.Prefix(msg, err)
|
err = errors.Prefix(msg, err)
|
||||||
*e = errors.Prefix(err.Error(), *e)
|
*e = errors.Prefix(err.Error(), *e)
|
||||||
}
|
}
|
||||||
} else if !s.IsInterrupted() {
|
} else if !s.IsInterrupted() {
|
||||||
_, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced, "")
|
_, _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
*e = err
|
*e = err
|
||||||
}
|
}
|
||||||
|
@ -472,7 +475,7 @@ func (s *Sync) startWorker(workerNum int) {
|
||||||
}
|
}
|
||||||
SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg)
|
SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg)
|
||||||
}
|
}
|
||||||
s.AppendSyncedVideo(v.ID(), false, err.Error())
|
s.AppendSyncedVideo(v.ID(), false, err.Error(), "")
|
||||||
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error(), v.Size())
|
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error(), v.Size())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
|
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
|
||||||
|
@ -630,9 +633,9 @@ func (s *Sync) processVideo(v video) (err error) {
|
||||||
log.Println(v.ID() + " took " + time.Since(start).String())
|
log.Println(v.ID() + " took " + time.Since(start).String())
|
||||||
}(time.Now())
|
}(time.Now())
|
||||||
|
|
||||||
s.syncedVideosMux.Lock()
|
s.syncedVideosMux.RLock()
|
||||||
sv, ok := s.syncedVideos[v.ID()]
|
sv, ok := s.syncedVideos[v.ID()]
|
||||||
s.syncedVideosMux.Unlock()
|
s.syncedVideosMux.RUnlock()
|
||||||
alreadyPublished := ok && sv.Published
|
alreadyPublished := ok && sv.Published
|
||||||
|
|
||||||
neverRetryFailures := []string{
|
neverRetryFailures := []string{
|
||||||
|
@ -670,7 +673,7 @@ func (s *Sync) processVideo(v video) (err error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.MaxVideoSize)
|
summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.MaxVideoSize, s.claimNames, s.syncedVideosMux)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -679,7 +682,7 @@ func (s *Sync) processVideo(v video) (err error) {
|
||||||
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
|
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
s.AppendSyncedVideo(v.ID(), true, "")
|
s.AppendSyncedVideo(v.ID(), true, "", summary.ClaimName)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue