Better tracking (Size and failure reason) #35

Merged
nikooo777 merged 8 commits from better-tracking into master 2018-09-26 22:26:49 +02:00
8 changed files with 239 additions and 119 deletions

View file

@ -456,3 +456,15 @@ func (d *Client) NumClaimsInChannel(url string) (uint64, error) {
}
return channel.ClaimsInChannel, nil
}
func (d *Client) ClaimListMine() (*ClaimListMineResponse, error) {
response := new(ClaimListMineResponse)
err := d.call(response, "claim_list_mine", map[string]interface{}{})
if err != nil {
return nil, err
} else if response == nil {
return nil, errors.Err("no response")
}
return response, nil
}

View file

@ -33,15 +33,22 @@ type Support struct {
type Claim struct {
Address string `json:"address"`
Amount decimal.Decimal `json:"amount"`
BlocksToExpiration int `json:"blocks_to_expiration"`
Category string `json:"category"`
ClaimID string `json:"claim_id"`
ClaimSequence int `json:"claim_sequence"`
Confirmations int `json:"confirmations"`
DecodedClaim bool `json:"decoded_claim"`
Depth int `json:"depth"`
EffectiveAmount decimal.Decimal `json:"effective_amount"`
ExpirationHeight int `json:"expiration_height"`
Expired bool `json:"expired"`
Height int `json:"height"`
Hex string `json:"hex"`
IsSpent bool `json:"is_spent"`
Name string `json:"name"`
Nout int `json:"nout"`
PermanentUrl string `json:"permanent_url"`
Supports []Support `json:"supports"`
Txid string `json:"txid"`
ValidAtHeight int `json:"valid_at_height"`
@ -234,7 +241,7 @@ type ClaimListResponse struct {
LastTakeoverHeight int `json:"last_takeover_height"`
SupportsWithoutClaims []Support `json:"supports_without_claims"`
}
type ClaimListMineResponse []Claim
type ClaimShowResponse Claim
type PeerListResponsePeer struct {

View file

@ -70,7 +70,7 @@ type apiYoutubeChannel struct {
SyncServer null.String `json:"sync_server"`
}
func (s SyncManager) fetchChannels(status string) ([]apiYoutubeChannel, error) {
func (s *SyncManager) fetchChannels(status string) ([]apiYoutubeChannel, error) {
endpoint := s.ApiURL + "/yt/jobs"
res, _ := http.PostForm(endpoint, url.Values{
"auth_token": {s.ApiToken},
@ -105,35 +105,41 @@ type syncedVideo struct {
VideoID string `json:"video_id"`
Published bool `json:"published"`
FailureReason string `json:"failure_reason"`
ClaimName string `json:"claim_name"`
}
func (s SyncManager) setChannelStatus(channelID string, status string) (map[string]syncedVideo, error) {
func (s *SyncManager) setChannelStatus(channelID string, status string, failureReason string) (map[string]syncedVideo, map[string]bool, error) {
endpoint := s.ApiURL + "/yt/channel_status"
if len(failureReason) > maxReasonLength {
failureReason = failureReason[:maxReasonLength]
}
res, _ := http.PostForm(endpoint, url.Values{
"channel_id": {channelID},
"sync_server": {s.HostName},
"auth_token": {s.ApiToken},
"sync_status": {status},
"failure_reason": {failureReason},
})
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
var response apiChannelStatusResponse
err := json.Unmarshal(body, &response)
if err != nil {
return nil, err
return nil, nil, err
}
if !response.Error.IsNull() {
return nil, errors.Err(response.Error.String)
return nil, nil, errors.Err(response.Error.String)
}
if response.Data != nil {
svs := make(map[string]syncedVideo)
claimNames := make(map[string]bool)
for _, v := range response.Data {
svs[v.VideoID] = v
claimNames[v.ClaimName] = v.Published
}
return svs, nil
return svs, claimNames, nil
}
return nil, errors.Err("invalid API response. Status code: %d", res.StatusCode)
return nil, nil, errors.Err("invalid API response. Status code: %d", res.StatusCode)
}
const (
@ -141,9 +147,11 @@ const (
VideoStatusFailed = "failed"
)
func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string) error {
func (s *SyncManager) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string, size *int64) error {
endpoint := s.ApiURL + "/yt/video_status"
if len(failureReason) > maxReasonLength {
failureReason = failureReason[:maxReasonLength]
}
vals := url.Values{
"youtube_channel_id": {channelID},
"video_id": {videoID},
@ -157,12 +165,11 @@ func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status st
vals.Add("published_at", strconv.FormatInt(time.Now().Unix(), 10))
vals.Add("claim_id", claimID)
vals.Add("claim_name", claimName)
if size != nil {
vals.Add("size", strconv.FormatInt(*size, 10))
}
}
if failureReason != "" {
maxReasonLength := 500
if len(failureReason) > maxReasonLength {
failureReason = failureReason[:500]
}
vals.Add("failure_reason", failureReason)
}
res, _ := http.PostForm(endpoint, vals)
@ -186,7 +193,7 @@ func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status st
return errors.Err("invalid API response. Status code: %d", res.StatusCode)
}
func (s SyncManager) Start() error {
func (s *SyncManager) Start() error {
syncCount := 0
for {
err := s.checkUsedSpace()
@ -220,7 +227,7 @@ func (s SyncManager) Start() error {
ConcurrentVideos: s.ConcurrentVideos,
TakeOverExistingChannel: s.TakeOverExistingChannel,
Refill: s.Refill,
Manager: &s,
Manager: s,
LbrycrdString: s.LbrycrdString,
AwsS3ID: s.AwsS3ID,
AwsS3Secret: s.AwsS3Secret,
@ -255,7 +262,7 @@ func (s SyncManager) Start() error {
ConcurrentVideos: s.ConcurrentVideos,
TakeOverExistingChannel: s.TakeOverExistingChannel,
Refill: s.Refill,
Manager: &s,
Manager: s,
LbrycrdString: s.LbrycrdString,
AwsS3ID: s.AwsS3ID,
AwsS3Secret: s.AwsS3Secret,
@ -305,11 +312,11 @@ func (s SyncManager) Start() error {
return nil
}
func (s SyncManager) isWorthProcessing(channel apiYoutubeChannel) bool {
func (s *SyncManager) isWorthProcessing(channel apiYoutubeChannel) bool {
return channel.TotalVideos > 0 && (channel.SyncServer.IsNull() || channel.SyncServer.String == s.HostName)
}
func (s SyncManager) checkUsedSpace() error {
func (s *SyncManager) checkUsedSpace() error {
usedPctile, err := GetUsedSpace(s.BlobsDir)
if err != nil {
return err

View file

@ -45,9 +45,9 @@ func (s *Sync) walletSetup() error {
return nil
}
s.syncedVideosMux.Lock()
s.syncedVideosMux.RLock()
numPublished := len(s.syncedVideos) //should we only count published videos? Credits are allocated even for failed ones...
s.syncedVideosMux.Unlock()
s.syncedVideosMux.RUnlock()
log.Debugf("We already published %d videos", numPublished)
if numOnSource-numPublished > s.Manager.VideosLimit {

View file

@ -49,22 +49,22 @@ func getClaimNameFromTitle(title string, attempt int) string {
return name + suffix
}
var publishedNamesMutex sync.RWMutex
var publishedNames = map[string]bool{}
func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions) (*SyncSummary, error) {
func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) {
attempt := 0
for {
attempt++
name := getClaimNameFromTitle(title, attempt)
publishedNamesMutex.RLock()
_, exists := publishedNames[name]
publishedNamesMutex.RUnlock()
syncedVideosMux.Lock()
_, exists := claimNames[name]
if exists {
log.Printf("name exists, retrying (%d attempts so far)\n", attempt)
log.Printf("name exists, retrying (%d attempts so far)", attempt)
syncedVideosMux.Unlock()
continue
}
claimNames[name] = false
syncedVideosMux.Unlock()
//if for some reasons the title can't be converted in a valid claim name (too short or not latin) then we use a hash
if len(name) < 2 {
hasher := md5.New()
@ -74,13 +74,13 @@ func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string
response, err := daemon.Publish(name, filename, amount, options)
if err == nil || strings.Contains(err.Error(), "failed: Multiple claims (") {
publishedNamesMutex.Lock()
publishedNames[name] = true
publishedNamesMutex.Unlock()
syncedVideosMux.Lock()
claimNames[name] = true
syncedVideosMux.Unlock()
if err == nil {
return &SyncSummary{ClaimID: response.ClaimID, ClaimName: name}, nil
} else {
log.Printf("name exists, retrying (%d attempts so far)\n", attempt)
log.Printf("name exists, retrying (%d attempts so far)", attempt)
continue
}
} else {

View file

@ -8,6 +8,8 @@ import (
"strings"
"time"
"sync"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
@ -26,11 +28,13 @@ type ucbVideo struct {
description string
publishedAt time.Time
dir string
claimNames map[string]bool
syncedVideosMux *sync.RWMutex
}
func NewUCBVideo(id, title, channel, description, publishedAt, dir string) ucbVideo {
func NewUCBVideo(id, title, channel, description, publishedAt, dir string) *ucbVideo {
p, _ := time.Parse(time.RFC3339Nano, publishedAt) // ignore parse errors
return ucbVideo{
return &ucbVideo{
id: id,
title: title,
description: description,
@ -40,19 +44,19 @@ func NewUCBVideo(id, title, channel, description, publishedAt, dir string) ucbVi
}
}
lyoshenka commented 2018-08-27 19:59:17 +02:00 (Migrated from github.com)
Review

why are these fields here if they never get used?

why are these fields here if they never get used?
lyoshenka commented 2018-08-27 20:00:08 +02:00 (Migrated from github.com)
Review

and having syncedVideosMux on each video is weird. whats going on?

and having syncedVideosMux on each video is weird. whats going on?
nikooo777 commented 2018-08-27 20:12:54 +02:00 (Migrated from github.com)
Review

there is only one mutex for the whole map, the pattern is the same you used locally in the function where a name is generated for the claim.
I however agree with your comment about passing down the mutex, I'm going to correct that.

there is only one mutex for the whole map, the pattern is the same you used locally in the function where a name is generated for the claim. I however agree with your comment about passing down the mutex, I'm going to correct that.
func (v ucbVideo) ID() string {
func (v *ucbVideo) ID() string {
return v.id
}
func (v ucbVideo) PlaylistPosition() int {
func (v *ucbVideo) PlaylistPosition() int {
return 0
}
func (v ucbVideo) IDAndNum() string {
func (v *ucbVideo) IDAndNum() string {
return v.ID() + " (?)"
}
func (v ucbVideo) PublishedAt() time.Time {
func (v *ucbVideo) PublishedAt() time.Time {
return v.publishedAt
//r := regexp.MustCompile(`(\d\d\d\d)-(\d\d)-(\d\d)`)
//matches := r.FindStringSubmatch(v.title)
@ -65,11 +69,11 @@ func (v ucbVideo) PublishedAt() time.Time {
//return time.Now()
}
func (v ucbVideo) getFilename() string {
func (v *ucbVideo) getFilename() string {
return v.dir + "/" + v.id + ".mp4"
}
func (v ucbVideo) getClaimName(attempt int) string {
func (v *ucbVideo) getClaimName(attempt int) string {
reg := regexp.MustCompile(`[^a-zA-Z0-9]+`)
suffix := ""
if attempt > 1 {
@ -98,7 +102,7 @@ func (v ucbVideo) getClaimName(attempt int) string {
return name + suffix
}
func (v ucbVideo) getAbbrevDescription() string {
func (v *ucbVideo) getAbbrevDescription() string {
maxLines := 10
description := strings.TrimSpace(v.description)
if strings.Count(description, "\n") < maxLines {
@ -107,7 +111,7 @@ func (v ucbVideo) getAbbrevDescription() string {
return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..."
}
func (v ucbVideo) download() error {
func (v *ucbVideo) download() error {
videoPath := v.getFilename()
_, err := os.Stat(videoPath)
@ -146,7 +150,7 @@ func (v ucbVideo) download() error {
return nil
}
func (v ucbVideo) saveThumbnail() error {
func (v *ucbVideo) saveThumbnail() error {
resp, err := http.Get("https://s3.us-east-2.amazonaws.com/lbry-niko2/thumbnails/" + v.id)
if err != nil {
return err
@ -170,7 +174,7 @@ func (v ucbVideo) saveThumbnail() error {
return err
}
func (v ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) {
func (v *ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) {
options := jsonrpc.PublishOptions{
Title: &v.title,
Author: strPtr("UC Berkeley"),
@ -183,10 +187,16 @@ func (v ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount fl
ChangeAddress: &claimAddress,
}
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options)
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, v.claimNames, v.syncedVideosMux)
}
func (v ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) {
func (v *ucbVideo) Size() *int64 {
return nil
}
func (v *ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) {
v.claimNames = claimNames
v.syncedVideosMux = syncedVideosMux
//download and thumbnail can be done in parallel
err := v.download()
if err != nil {

View file

@ -11,6 +11,8 @@ import (
"strings"
"time"
"sync"
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/jsonrpc"
@ -25,13 +27,16 @@ type YoutubeVideo struct {
title string
description string
playlistPosition int64
size *int64
publishedAt time.Time
dir string
claimNames map[string]bool
syncedVideosMux *sync.RWMutex
}
func NewYoutubeVideo(directory string, snippet *youtube.PlaylistItemSnippet) YoutubeVideo {
func NewYoutubeVideo(directory string, snippet *youtube.PlaylistItemSnippet) *YoutubeVideo {
publishedAt, _ := time.Parse(time.RFC3339Nano, snippet.PublishedAt) // ignore parse errors
return YoutubeVideo{
return &YoutubeVideo{
id: snippet.ResourceId.VideoId,
title: snippet.Title,
description: snippet.Description,
@ -42,23 +47,23 @@ func NewYoutubeVideo(directory string, snippet *youtube.PlaylistItemSnippet) You
}
}
func (v YoutubeVideo) ID() string {
func (v *YoutubeVideo) ID() string {
return v.id
}
func (v YoutubeVideo) PlaylistPosition() int {
func (v *YoutubeVideo) PlaylistPosition() int {
return int(v.playlistPosition)
}
func (v YoutubeVideo) IDAndNum() string {
func (v *YoutubeVideo) IDAndNum() string {
return v.ID() + " (" + strconv.Itoa(int(v.playlistPosition)) + " in channel)"
}
func (v YoutubeVideo) PublishedAt() time.Time {
func (v *YoutubeVideo) PublishedAt() time.Time {
return v.publishedAt
}
func (v YoutubeVideo) getFilename() string {
func (v *YoutubeVideo) getFilename() string {
maxLen := 30
reg := regexp.MustCompile(`[^a-zA-Z0-9]+`)
@ -85,7 +90,7 @@ func (v YoutubeVideo) getFilename() string {
return v.videoDir() + "/" + name + ".mp4"
}
func (v YoutubeVideo) getAbbrevDescription() string {
func (v *YoutubeVideo) getAbbrevDescription() string {
maxLines := 10
description := strings.TrimSpace(v.description)
if strings.Count(description, "\n") < maxLines {
@ -94,7 +99,7 @@ func (v YoutubeVideo) getAbbrevDescription() string {
return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..."
}
func (v YoutubeVideo) download() error {
func (v *YoutubeVideo) download() error {
videoPath := v.getFilename()
err := os.Mkdir(v.videoDir(), 0750)
@ -118,20 +123,19 @@ func (v YoutubeVideo) download() error {
var downloadedFile *os.File
downloadedFile, err = os.Create(videoPath)
defer downloadedFile.Close()
if err != nil {
return err
}
defer downloadedFile.Close()
return videoInfo.Download(videoInfo.Formats.Best(ytdl.FormatAudioEncodingKey)[0], downloadedFile)
return videoInfo.Download(videoInfo.Formats.Best(ytdl.FormatAudioEncodingKey)[1], downloadedFile)
}
func (v YoutubeVideo) videoDir() string {
func (v *YoutubeVideo) videoDir() string {
return v.dir + "/" + v.id
}
func (v YoutubeVideo) delete() error {
func (v *YoutubeVideo) delete() error {
videoPath := v.getFilename()
err := os.Remove(videoPath)
if err != nil {
@ -142,7 +146,7 @@ func (v YoutubeVideo) delete() error {
return nil
}
func (v YoutubeVideo) triggerThumbnailSave() error {
func (v *YoutubeVideo) triggerThumbnailSave() error {
client := &http.Client{Timeout: 30 * time.Second}
params, err := json.Marshal(map[string]string{"videoid": v.id})
@ -167,17 +171,17 @@ func (v YoutubeVideo) triggerThumbnailSave() error {
}
var decoded struct {
error int `json:"error"`
url string `json:"url,omitempty"`
message string `json:"message,omitempty"`
Error int `json:"error"`
Url string `json:"url,omitempty"`
Message string `json:"message,omitempty"`
}
err = json.Unmarshal(contents, &decoded)
if err != nil {
return err
}
if decoded.error != 0 {
return errors.Err("error creating thumbnail: " + decoded.message)
if decoded.Error != 0 {
return errors.Err("error creating thumbnail: " + decoded.Message)
}
return nil
@ -185,7 +189,7 @@ func (v YoutubeVideo) triggerThumbnailSave() error {
func strPtr(s string) *string { return &s }
func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) {
func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) {
if channelID == "" {
return nil, errors.Err("a claim_id for the channel wasn't provided") //TODO: this is probably not needed?
}
@ -200,10 +204,16 @@ func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amoun
ChangeAddress: &claimAddress,
ChannelID: &channelID,
}
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options)
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, v.claimNames, v.syncedVideosMux)
}
func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) {
func (v *YoutubeVideo) Size() *int64 {
return v.size
}
func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) {
v.claimNames = claimNames
v.syncedVideosMux = syncedVideosMux
//download and thumbnail can be done in parallel
err := v.download()
if err != nil {
@ -215,7 +225,10 @@ func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount f
if err != nil {
return nil, err
}
if fi.Size() > int64(maxVideoSize)*1024*1024 {
videoSize := fi.Size()
v.size = &videoSize
if videoSize > int64(maxVideoSize)*1024*1024 {
//delete the video and ignore the error
_ = v.delete()
return nil, errors.Err("the video is too big to sync, skipping for now")

View file

@ -38,14 +38,16 @@ import (
const (
channelClaimAmount = 0.01
publishAmount = 0.01
maxReasonLength = 500
)
type video interface {
Size() *int64
ID() string
IDAndNum() string
PlaylistPosition() int
PublishedAt() time.Time
Sync(*jsonrpc.Client, string, float64, string, int) (*sources.SyncSummary, error)
Sync(*jsonrpc.Client, string, float64, string, int, map[string]bool, *sync.RWMutex) (*sources.SyncSummary, error)
}
// sorting videos
@ -76,8 +78,9 @@ type Sync struct {
claimAddress string
videoDirectory string
db *redisdb.DB
syncedVideosMux *sync.RWMutex
syncedVideos map[string]syncedVideo
syncedVideosMux *sync.Mutex
claimNames map[string]bool
grp *stop.Group
lbryChannelID string
@ -85,7 +88,7 @@ type Sync struct {
queue chan video
}
func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string) {
func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string, claimName string) {
s.syncedVideosMux.Lock()
defer s.syncedVideosMux.Unlock()
s.syncedVideos[videoID] = syncedVideo{
@ -93,6 +96,9 @@ func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason s
Published: published,
FailureReason: failureReason,
}
if claimName != "" {
s.claimNames[claimName] = true
}
}
// SendErrorToSlack Sends an error message to the default channel and to the process log.
@ -223,7 +229,7 @@ func (s *Sync) FullCycle() (e error) {
if s.YoutubeChannelID == "" {
return errors.Err("channel ID not provided")
}
s.syncedVideosMux = &sync.Mutex{}
s.syncedVideosMux = &sync.RWMutex{}
s.walletMux = &sync.Mutex{}
s.db = redisdb.New()
s.grp = stop.New()
@ -236,12 +242,13 @@ func (s *Sync) FullCycle() (e error) {
log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)")
s.grp.Stop()
}()
syncedVideos, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing)
syncedVideos, claimNames, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing, "")
if err != nil {
return err
}
s.syncedVideosMux.Lock()
s.syncedVideos = syncedVideos
s.claimNames = claimNames
s.syncedVideosMux.Unlock()
defer s.updateChannelStatus(&e)
@ -298,14 +305,15 @@ func (s *Sync) updateChannelStatus(e *error) {
if util.SubstringInSlice((*e).Error(), noFailConditions) {
return
}
_, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed)
failureReason := (*e).Error()
_, _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed, failureReason)
if err != nil {
msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName)
err = errors.Prefix(msg, err)
*e = errors.Prefix(err.Error(), *e)
}
} else if !s.IsInterrupted() {
_, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced)
_, _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced, "")
if err != nil {
*e = err
}
@ -343,7 +351,7 @@ func (s *Sync) stopAndUploadWallet(e *error) {
err := s.uploadWallet()
if err != nil {
if *e == nil {
e = &err
e = &err //not 100% sure
return
} else {
*e = errors.Prefix("failure uploading wallet: ", *e)
@ -357,9 +365,71 @@ func logShutdownError(shutdownErr error) {
SendErrorToSlack("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR")
}
func hasDupes(claims []jsonrpc.Claim) (bool, error) {
videoIDs := make(map[string]interface{})
for _, c := range claims {
if !util.InSlice(c.Category, []string{"claim", "update"}) || c.Value.Stream == nil {
continue
}
if c.Value.Stream.Metadata == nil || c.Value.Stream.Metadata.Thumbnail == nil {
return false, errors.Err("something is wrong with the this claim: %s", c.ClaimID)
}
tn := *c.Value.Stream.Metadata.Thumbnail
videoID := tn[:strings.LastIndex(tn, "/")+1]
_, ok := videoIDs[videoID]
if !ok {
videoIDs[videoID] = nil
continue
}
return true, nil
}
return false, nil
}
//publishesCount counts the amount of videos published so far
func publishesCount(claims []jsonrpc.Claim) (int, error) {
count := 0
for _, c := range claims {
if !util.InSlice(c.Category, []string{"claim", "update"}) || c.Value.Stream == nil {
continue
}
if c.Value.Stream.Metadata == nil || c.Value.Stream.Metadata.Thumbnail == nil {
return count, errors.Err("something is wrong with the this claim: %s", c.ClaimID)
}
count++
}
return count, nil
}
func (s *Sync) doSync() error {
var err error
claims, err := s.daemon.ClaimListMine()
if err != nil {
return errors.Prefix("cannot list claims: ", err)
}
hasDupes, err := hasDupes(*claims)
if err != nil {
return errors.Prefix("error checking for duplicates: ", err)
}
if hasDupes {
return errors.Err("channel has duplicates! Manual fix required")
}
pubsOnWallet, err := publishesCount(*claims)
if err != nil {
return errors.Prefix("error counting claims: ", err)
}
pubsOnDB := 0
for _, sv := range s.syncedVideos {
if sv.Published {
pubsOnDB++
}
}
if pubsOnWallet > pubsOnDB {
return errors.Err("not all published videos are in the database")
}
if pubsOnWallet < pubsOnDB {
SendInfoToSlack("We're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID)
}
err = s.walletSetup()
if err != nil {
return errors.Prefix("Initial wallet setup failed! Manual Intervention is required.", err)
@ -371,10 +441,10 @@ func (s *Sync) doSync() error {
for i := 0; i < s.ConcurrentVideos; i++ {
s.grp.Add(1)
go func() {
go func(i int) {
defer s.grp.Done()
lyoshenka commented 2018-08-27 20:01:24 +02:00 (Migrated from github.com)
Review

why does this function return a pointer to an error?

why does this function return a pointer to an error?
nikooo777 commented 2018-08-27 20:11:34 +02:00 (Migrated from github.com)
Review

because the function is run as a deferred and needs to read/write to the error being returned by the original function. Is this a bad pattern?

because the function is run as a deferred and needs to read/write to the error being returned by the original function. Is this a bad pattern?
nikooo777 commented 2018-08-27 20:18:36 +02:00 (Migrated from github.com)
Review

well actually to be correct it's not returning a pointer to an error, it's accepting a pointer to an error so that I can change it from within.

well actually to be correct it's not returning a pointer to an error, it's accepting a pointer to an error so that I can change it from within.
s.startWorker(i)
}()
}(i)
}
if s.LbryChannelName == "@UCBerkeley" {
@ -469,8 +539,8 @@ func (s *Sync) startWorker(workerNum int) {
}
SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg)
}
s.AppendSyncedVideo(v.ID(), false, err.Error())
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error())
s.AppendSyncedVideo(v.ID(), false, err.Error(), "")
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error(), v.Size())
if err != nil {
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
}
@ -627,9 +697,9 @@ func (s *Sync) processVideo(v video) (err error) {
log.Println(v.ID() + " took " + time.Since(start).String())
}(time.Now())
s.syncedVideosMux.Lock()
s.syncedVideosMux.RLock()
sv, ok := s.syncedVideos[v.ID()]
s.syncedVideosMux.Unlock()
s.syncedVideosMux.RUnlock()
alreadyPublished := ok && sv.Published
neverRetryFailures := []string{
@ -667,15 +737,16 @@ func (s *Sync) processVideo(v video) (err error) {
if err != nil {
return err
}
summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.MaxVideoSize)
summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.MaxVideoSize, s.claimNames, s.syncedVideosMux)
if err != nil {
return err
}
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "")
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "", v.Size())
if err != nil {
return err
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
}
s.AppendSyncedVideo(v.ID(), true, "")
s.AppendSyncedVideo(v.ID(), true, "", summary.ClaimName)
return nil
}