Merge pull request #35 from lbryio/better-tracking

Better tracking (Size and failure reason)
This commit is contained in:
Niko 2018-09-26 22:26:48 +02:00 committed by GitHub
commit 4afbfeb01b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 239 additions and 119 deletions

View file

@ -456,3 +456,15 @@ func (d *Client) NumClaimsInChannel(url string) (uint64, error) {
}
return channel.ClaimsInChannel, nil
}
func (d *Client) ClaimListMine() (*ClaimListMineResponse, error) {
response := new(ClaimListMineResponse)
err := d.call(response, "claim_list_mine", map[string]interface{}{})
if err != nil {
return nil, err
} else if response == nil {
return nil, errors.Err("no response")
}
return response, nil
}

View file

@ -31,25 +31,32 @@ type Support struct {
}
type Claim struct {
Address string `json:"address"`
Amount decimal.Decimal `json:"amount"`
ClaimID string `json:"claim_id"`
ClaimSequence int `json:"claim_sequence"`
DecodedClaim bool `json:"decoded_claim"`
Depth int `json:"depth"`
EffectiveAmount decimal.Decimal `json:"effective_amount"`
Height int `json:"height"`
Hex string `json:"hex"`
Name string `json:"name"`
Nout int `json:"nout"`
Supports []Support `json:"supports"`
Txid string `json:"txid"`
ValidAtHeight int `json:"valid_at_height"`
Value lbryschema.Claim `json:"value"`
Error *string `json:"error,omitempty"`
ChannelName *string `json:"channel_name,omitempty"`
HasSignature *bool `json:"has_signature,omitempty"`
SignatureIsValid *bool `json:"signature_is_valid,omitempty"`
Address string `json:"address"`
Amount decimal.Decimal `json:"amount"`
BlocksToExpiration int `json:"blocks_to_expiration"`
Category string `json:"category"`
ClaimID string `json:"claim_id"`
ClaimSequence int `json:"claim_sequence"`
Confirmations int `json:"confirmations"`
DecodedClaim bool `json:"decoded_claim"`
Depth int `json:"depth"`
EffectiveAmount decimal.Decimal `json:"effective_amount"`
ExpirationHeight int `json:"expiration_height"`
Expired bool `json:"expired"`
Height int `json:"height"`
Hex string `json:"hex"`
IsSpent bool `json:"is_spent"`
Name string `json:"name"`
Nout int `json:"nout"`
PermanentUrl string `json:"permanent_url"`
Supports []Support `json:"supports"`
Txid string `json:"txid"`
ValidAtHeight int `json:"valid_at_height"`
Value lbryschema.Claim `json:"value"`
Error *string `json:"error,omitempty"`
ChannelName *string `json:"channel_name,omitempty"`
HasSignature *bool `json:"has_signature,omitempty"`
SignatureIsValid *bool `json:"signature_is_valid,omitempty"`
}
type File struct {
@ -234,7 +241,7 @@ type ClaimListResponse struct {
LastTakeoverHeight int `json:"last_takeover_height"`
SupportsWithoutClaims []Support `json:"supports_without_claims"`
}
type ClaimListMineResponse []Claim
type ClaimShowResponse Claim
type PeerListResponsePeer struct {

View file

@ -70,7 +70,7 @@ type apiYoutubeChannel struct {
SyncServer null.String `json:"sync_server"`
}
func (s SyncManager) fetchChannels(status string) ([]apiYoutubeChannel, error) {
func (s *SyncManager) fetchChannels(status string) ([]apiYoutubeChannel, error) {
endpoint := s.ApiURL + "/yt/jobs"
res, _ := http.PostForm(endpoint, url.Values{
"auth_token": {s.ApiToken},
@ -105,35 +105,41 @@ type syncedVideo struct {
VideoID string `json:"video_id"`
Published bool `json:"published"`
FailureReason string `json:"failure_reason"`
ClaimName string `json:"claim_name"`
}
func (s SyncManager) setChannelStatus(channelID string, status string) (map[string]syncedVideo, error) {
func (s *SyncManager) setChannelStatus(channelID string, status string, failureReason string) (map[string]syncedVideo, map[string]bool, error) {
endpoint := s.ApiURL + "/yt/channel_status"
if len(failureReason) > maxReasonLength {
failureReason = failureReason[:maxReasonLength]
}
res, _ := http.PostForm(endpoint, url.Values{
"channel_id": {channelID},
"sync_server": {s.HostName},
"auth_token": {s.ApiToken},
"sync_status": {status},
"channel_id": {channelID},
"sync_server": {s.HostName},
"auth_token": {s.ApiToken},
"sync_status": {status},
"failure_reason": {failureReason},
})
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
var response apiChannelStatusResponse
err := json.Unmarshal(body, &response)
if err != nil {
return nil, err
return nil, nil, err
}
if !response.Error.IsNull() {
return nil, errors.Err(response.Error.String)
return nil, nil, errors.Err(response.Error.String)
}
if response.Data != nil {
svs := make(map[string]syncedVideo)
claimNames := make(map[string]bool)
for _, v := range response.Data {
svs[v.VideoID] = v
claimNames[v.ClaimName] = v.Published
}
return svs, nil
return svs, claimNames, nil
}
return nil, errors.Err("invalid API response. Status code: %d", res.StatusCode)
return nil, nil, errors.Err("invalid API response. Status code: %d", res.StatusCode)
}
const (
@ -141,9 +147,11 @@ const (
VideoStatusFailed = "failed"
)
func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string) error {
func (s *SyncManager) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string, size *int64) error {
endpoint := s.ApiURL + "/yt/video_status"
if len(failureReason) > maxReasonLength {
failureReason = failureReason[:maxReasonLength]
}
vals := url.Values{
"youtube_channel_id": {channelID},
"video_id": {videoID},
@ -157,12 +165,11 @@ func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status st
vals.Add("published_at", strconv.FormatInt(time.Now().Unix(), 10))
vals.Add("claim_id", claimID)
vals.Add("claim_name", claimName)
if size != nil {
vals.Add("size", strconv.FormatInt(*size, 10))
}
}
if failureReason != "" {
maxReasonLength := 500
if len(failureReason) > maxReasonLength {
failureReason = failureReason[:500]
}
vals.Add("failure_reason", failureReason)
}
res, _ := http.PostForm(endpoint, vals)
@ -186,7 +193,7 @@ func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status st
return errors.Err("invalid API response. Status code: %d", res.StatusCode)
}
func (s SyncManager) Start() error {
func (s *SyncManager) Start() error {
syncCount := 0
for {
err := s.checkUsedSpace()
@ -220,7 +227,7 @@ func (s SyncManager) Start() error {
ConcurrentVideos: s.ConcurrentVideos,
TakeOverExistingChannel: s.TakeOverExistingChannel,
Refill: s.Refill,
Manager: &s,
Manager: s,
LbrycrdString: s.LbrycrdString,
AwsS3ID: s.AwsS3ID,
AwsS3Secret: s.AwsS3Secret,
@ -255,7 +262,7 @@ func (s SyncManager) Start() error {
ConcurrentVideos: s.ConcurrentVideos,
TakeOverExistingChannel: s.TakeOverExistingChannel,
Refill: s.Refill,
Manager: &s,
Manager: s,
LbrycrdString: s.LbrycrdString,
AwsS3ID: s.AwsS3ID,
AwsS3Secret: s.AwsS3Secret,
@ -305,11 +312,11 @@ func (s SyncManager) Start() error {
return nil
}
func (s SyncManager) isWorthProcessing(channel apiYoutubeChannel) bool {
func (s *SyncManager) isWorthProcessing(channel apiYoutubeChannel) bool {
return channel.TotalVideos > 0 && (channel.SyncServer.IsNull() || channel.SyncServer.String == s.HostName)
}
func (s SyncManager) checkUsedSpace() error {
func (s *SyncManager) checkUsedSpace() error {
usedPctile, err := GetUsedSpace(s.BlobsDir)
if err != nil {
return err

View file

@ -45,9 +45,9 @@ func (s *Sync) walletSetup() error {
return nil
}
s.syncedVideosMux.Lock()
s.syncedVideosMux.RLock()
numPublished := len(s.syncedVideos) //should we only count published videos? Credits are allocated even for failed ones...
s.syncedVideosMux.Unlock()
s.syncedVideosMux.RUnlock()
log.Debugf("We already published %d videos", numPublished)
if numOnSource-numPublished > s.Manager.VideosLimit {

View file

@ -49,22 +49,22 @@ func getClaimNameFromTitle(title string, attempt int) string {
return name + suffix
}
var publishedNamesMutex sync.RWMutex
var publishedNames = map[string]bool{}
func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions) (*SyncSummary, error) {
func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) {
attempt := 0
for {
attempt++
name := getClaimNameFromTitle(title, attempt)
publishedNamesMutex.RLock()
_, exists := publishedNames[name]
publishedNamesMutex.RUnlock()
syncedVideosMux.Lock()
_, exists := claimNames[name]
if exists {
log.Printf("name exists, retrying (%d attempts so far)\n", attempt)
log.Printf("name exists, retrying (%d attempts so far)", attempt)
syncedVideosMux.Unlock()
continue
}
claimNames[name] = false
syncedVideosMux.Unlock()
//if for some reasons the title can't be converted in a valid claim name (too short or not latin) then we use a hash
if len(name) < 2 {
hasher := md5.New()
@ -74,13 +74,13 @@ func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string
response, err := daemon.Publish(name, filename, amount, options)
if err == nil || strings.Contains(err.Error(), "failed: Multiple claims (") {
publishedNamesMutex.Lock()
publishedNames[name] = true
publishedNamesMutex.Unlock()
syncedVideosMux.Lock()
claimNames[name] = true
syncedVideosMux.Unlock()
if err == nil {
return &SyncSummary{ClaimID: response.ClaimID, ClaimName: name}, nil
} else {
log.Printf("name exists, retrying (%d attempts so far)\n", attempt)
log.Printf("name exists, retrying (%d attempts so far)", attempt)
continue
}
} else {

View file

@ -8,6 +8,8 @@ import (
"strings"
"time"
"sync"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
@ -20,17 +22,19 @@ import (
)
type ucbVideo struct {
id string
title string
channel string
description string
publishedAt time.Time
dir string
id string
title string
channel string
description string
publishedAt time.Time
dir string
claimNames map[string]bool
syncedVideosMux *sync.RWMutex
}
func NewUCBVideo(id, title, channel, description, publishedAt, dir string) ucbVideo {
func NewUCBVideo(id, title, channel, description, publishedAt, dir string) *ucbVideo {
p, _ := time.Parse(time.RFC3339Nano, publishedAt) // ignore parse errors
return ucbVideo{
return &ucbVideo{
id: id,
title: title,
description: description,
@ -40,19 +44,19 @@ func NewUCBVideo(id, title, channel, description, publishedAt, dir string) ucbVi
}
}
func (v ucbVideo) ID() string {
func (v *ucbVideo) ID() string {
return v.id
}
func (v ucbVideo) PlaylistPosition() int {
func (v *ucbVideo) PlaylistPosition() int {
return 0
}
func (v ucbVideo) IDAndNum() string {
func (v *ucbVideo) IDAndNum() string {
return v.ID() + " (?)"
}
func (v ucbVideo) PublishedAt() time.Time {
func (v *ucbVideo) PublishedAt() time.Time {
return v.publishedAt
//r := regexp.MustCompile(`(\d\d\d\d)-(\d\d)-(\d\d)`)
//matches := r.FindStringSubmatch(v.title)
@ -65,11 +69,11 @@ func (v ucbVideo) PublishedAt() time.Time {
//return time.Now()
}
func (v ucbVideo) getFilename() string {
func (v *ucbVideo) getFilename() string {
return v.dir + "/" + v.id + ".mp4"
}
func (v ucbVideo) getClaimName(attempt int) string {
func (v *ucbVideo) getClaimName(attempt int) string {
reg := regexp.MustCompile(`[^a-zA-Z0-9]+`)
suffix := ""
if attempt > 1 {
@ -98,7 +102,7 @@ func (v ucbVideo) getClaimName(attempt int) string {
return name + suffix
}
func (v ucbVideo) getAbbrevDescription() string {
func (v *ucbVideo) getAbbrevDescription() string {
maxLines := 10
description := strings.TrimSpace(v.description)
if strings.Count(description, "\n") < maxLines {
@ -107,7 +111,7 @@ func (v ucbVideo) getAbbrevDescription() string {
return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..."
}
func (v ucbVideo) download() error {
func (v *ucbVideo) download() error {
videoPath := v.getFilename()
_, err := os.Stat(videoPath)
@ -146,7 +150,7 @@ func (v ucbVideo) download() error {
return nil
}
func (v ucbVideo) saveThumbnail() error {
func (v *ucbVideo) saveThumbnail() error {
resp, err := http.Get("https://s3.us-east-2.amazonaws.com/lbry-niko2/thumbnails/" + v.id)
if err != nil {
return err
@ -170,7 +174,7 @@ func (v ucbVideo) saveThumbnail() error {
return err
}
func (v ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) {
func (v *ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) {
options := jsonrpc.PublishOptions{
Title: &v.title,
Author: strPtr("UC Berkeley"),
@ -183,10 +187,16 @@ func (v ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount fl
ChangeAddress: &claimAddress,
}
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options)
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, v.claimNames, v.syncedVideosMux)
}
func (v ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) {
func (v *ucbVideo) Size() *int64 {
return nil
}
func (v *ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) {
v.claimNames = claimNames
v.syncedVideosMux = syncedVideosMux
//download and thumbnail can be done in parallel
err := v.download()
if err != nil {

View file

@ -11,6 +11,8 @@ import (
"strings"
"time"
"sync"
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/jsonrpc"
@ -25,13 +27,16 @@ type YoutubeVideo struct {
title string
description string
playlistPosition int64
size *int64
publishedAt time.Time
dir string
claimNames map[string]bool
syncedVideosMux *sync.RWMutex
}
func NewYoutubeVideo(directory string, snippet *youtube.PlaylistItemSnippet) YoutubeVideo {
func NewYoutubeVideo(directory string, snippet *youtube.PlaylistItemSnippet) *YoutubeVideo {
publishedAt, _ := time.Parse(time.RFC3339Nano, snippet.PublishedAt) // ignore parse errors
return YoutubeVideo{
return &YoutubeVideo{
id: snippet.ResourceId.VideoId,
title: snippet.Title,
description: snippet.Description,
@ -42,23 +47,23 @@ func NewYoutubeVideo(directory string, snippet *youtube.PlaylistItemSnippet) You
}
}
func (v YoutubeVideo) ID() string {
func (v *YoutubeVideo) ID() string {
return v.id
}
func (v YoutubeVideo) PlaylistPosition() int {
func (v *YoutubeVideo) PlaylistPosition() int {
return int(v.playlistPosition)
}
func (v YoutubeVideo) IDAndNum() string {
func (v *YoutubeVideo) IDAndNum() string {
return v.ID() + " (" + strconv.Itoa(int(v.playlistPosition)) + " in channel)"
}
func (v YoutubeVideo) PublishedAt() time.Time {
func (v *YoutubeVideo) PublishedAt() time.Time {
return v.publishedAt
}
func (v YoutubeVideo) getFilename() string {
func (v *YoutubeVideo) getFilename() string {
maxLen := 30
reg := regexp.MustCompile(`[^a-zA-Z0-9]+`)
@ -85,7 +90,7 @@ func (v YoutubeVideo) getFilename() string {
return v.videoDir() + "/" + name + ".mp4"
}
func (v YoutubeVideo) getAbbrevDescription() string {
func (v *YoutubeVideo) getAbbrevDescription() string {
maxLines := 10
description := strings.TrimSpace(v.description)
if strings.Count(description, "\n") < maxLines {
@ -94,7 +99,7 @@ func (v YoutubeVideo) getAbbrevDescription() string {
return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..."
}
func (v YoutubeVideo) download() error {
func (v *YoutubeVideo) download() error {
videoPath := v.getFilename()
err := os.Mkdir(v.videoDir(), 0750)
@ -118,20 +123,19 @@ func (v YoutubeVideo) download() error {
var downloadedFile *os.File
downloadedFile, err = os.Create(videoPath)
defer downloadedFile.Close()
if err != nil {
return err
}
defer downloadedFile.Close()
return videoInfo.Download(videoInfo.Formats.Best(ytdl.FormatAudioEncodingKey)[0], downloadedFile)
return videoInfo.Download(videoInfo.Formats.Best(ytdl.FormatAudioEncodingKey)[1], downloadedFile)
}
func (v YoutubeVideo) videoDir() string {
func (v *YoutubeVideo) videoDir() string {
return v.dir + "/" + v.id
}
func (v YoutubeVideo) delete() error {
func (v *YoutubeVideo) delete() error {
videoPath := v.getFilename()
err := os.Remove(videoPath)
if err != nil {
@ -142,7 +146,7 @@ func (v YoutubeVideo) delete() error {
return nil
}
func (v YoutubeVideo) triggerThumbnailSave() error {
func (v *YoutubeVideo) triggerThumbnailSave() error {
client := &http.Client{Timeout: 30 * time.Second}
params, err := json.Marshal(map[string]string{"videoid": v.id})
@ -167,17 +171,17 @@ func (v YoutubeVideo) triggerThumbnailSave() error {
}
var decoded struct {
error int `json:"error"`
url string `json:"url,omitempty"`
message string `json:"message,omitempty"`
Error int `json:"error"`
Url string `json:"url,omitempty"`
Message string `json:"message,omitempty"`
}
err = json.Unmarshal(contents, &decoded)
if err != nil {
return err
}
if decoded.error != 0 {
return errors.Err("error creating thumbnail: " + decoded.message)
if decoded.Error != 0 {
return errors.Err("error creating thumbnail: " + decoded.Message)
}
return nil
@ -185,7 +189,7 @@ func (v YoutubeVideo) triggerThumbnailSave() error {
func strPtr(s string) *string { return &s }
func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) {
func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) {
if channelID == "" {
return nil, errors.Err("a claim_id for the channel wasn't provided") //TODO: this is probably not needed?
}
@ -200,10 +204,16 @@ func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amoun
ChangeAddress: &claimAddress,
ChannelID: &channelID,
}
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options)
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, v.claimNames, v.syncedVideosMux)
}
func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) {
func (v *YoutubeVideo) Size() *int64 {
return v.size
}
func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) {
v.claimNames = claimNames
v.syncedVideosMux = syncedVideosMux
//download and thumbnail can be done in parallel
err := v.download()
if err != nil {
@ -215,7 +225,10 @@ func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount f
if err != nil {
return nil, err
}
if fi.Size() > int64(maxVideoSize)*1024*1024 {
videoSize := fi.Size()
v.size = &videoSize
if videoSize > int64(maxVideoSize)*1024*1024 {
//delete the video and ignore the error
_ = v.delete()
return nil, errors.Err("the video is too big to sync, skipping for now")

View file

@ -38,14 +38,16 @@ import (
const (
channelClaimAmount = 0.01
publishAmount = 0.01
maxReasonLength = 500
)
type video interface {
Size() *int64
ID() string
IDAndNum() string
PlaylistPosition() int
PublishedAt() time.Time
Sync(*jsonrpc.Client, string, float64, string, int) (*sources.SyncSummary, error)
Sync(*jsonrpc.Client, string, float64, string, int, map[string]bool, *sync.RWMutex) (*sources.SyncSummary, error)
}
// sorting videos
@ -76,8 +78,9 @@ type Sync struct {
claimAddress string
videoDirectory string
db *redisdb.DB
syncedVideosMux *sync.RWMutex
syncedVideos map[string]syncedVideo
syncedVideosMux *sync.Mutex
claimNames map[string]bool
grp *stop.Group
lbryChannelID string
@ -85,7 +88,7 @@ type Sync struct {
queue chan video
}
func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string) {
func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string, claimName string) {
s.syncedVideosMux.Lock()
defer s.syncedVideosMux.Unlock()
s.syncedVideos[videoID] = syncedVideo{
@ -93,6 +96,9 @@ func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason s
Published: published,
FailureReason: failureReason,
}
if claimName != "" {
s.claimNames[claimName] = true
}
}
// SendErrorToSlack Sends an error message to the default channel and to the process log.
@ -223,7 +229,7 @@ func (s *Sync) FullCycle() (e error) {
if s.YoutubeChannelID == "" {
return errors.Err("channel ID not provided")
}
s.syncedVideosMux = &sync.Mutex{}
s.syncedVideosMux = &sync.RWMutex{}
s.walletMux = &sync.Mutex{}
s.db = redisdb.New()
s.grp = stop.New()
@ -236,12 +242,13 @@ func (s *Sync) FullCycle() (e error) {
log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)")
s.grp.Stop()
}()
syncedVideos, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing)
syncedVideos, claimNames, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing, "")
if err != nil {
return err
}
s.syncedVideosMux.Lock()
s.syncedVideos = syncedVideos
s.claimNames = claimNames
s.syncedVideosMux.Unlock()
defer s.updateChannelStatus(&e)
@ -298,14 +305,15 @@ func (s *Sync) updateChannelStatus(e *error) {
if util.SubstringInSlice((*e).Error(), noFailConditions) {
return
}
_, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed)
failureReason := (*e).Error()
_, _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed, failureReason)
if err != nil {
msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName)
err = errors.Prefix(msg, err)
*e = errors.Prefix(err.Error(), *e)
}
} else if !s.IsInterrupted() {
_, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced)
_, _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced, "")
if err != nil {
*e = err
}
@ -343,7 +351,7 @@ func (s *Sync) stopAndUploadWallet(e *error) {
err := s.uploadWallet()
if err != nil {
if *e == nil {
e = &err
e = &err //not 100% sure
return
} else {
*e = errors.Prefix("failure uploading wallet: ", *e)
@ -357,9 +365,71 @@ func logShutdownError(shutdownErr error) {
SendErrorToSlack("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR")
}
func hasDupes(claims []jsonrpc.Claim) (bool, error) {
videoIDs := make(map[string]interface{})
for _, c := range claims {
if !util.InSlice(c.Category, []string{"claim", "update"}) || c.Value.Stream == nil {
continue
}
if c.Value.Stream.Metadata == nil || c.Value.Stream.Metadata.Thumbnail == nil {
return false, errors.Err("something is wrong with the this claim: %s", c.ClaimID)
}
tn := *c.Value.Stream.Metadata.Thumbnail
videoID := tn[:strings.LastIndex(tn, "/")+1]
_, ok := videoIDs[videoID]
if !ok {
videoIDs[videoID] = nil
continue
}
return true, nil
}
return false, nil
}
//publishesCount counts the amount of videos published so far
func publishesCount(claims []jsonrpc.Claim) (int, error) {
count := 0
for _, c := range claims {
if !util.InSlice(c.Category, []string{"claim", "update"}) || c.Value.Stream == nil {
continue
}
if c.Value.Stream.Metadata == nil || c.Value.Stream.Metadata.Thumbnail == nil {
return count, errors.Err("something is wrong with the this claim: %s", c.ClaimID)
}
count++
}
return count, nil
}
func (s *Sync) doSync() error {
var err error
claims, err := s.daemon.ClaimListMine()
if err != nil {
return errors.Prefix("cannot list claims: ", err)
}
hasDupes, err := hasDupes(*claims)
if err != nil {
return errors.Prefix("error checking for duplicates: ", err)
}
if hasDupes {
return errors.Err("channel has duplicates! Manual fix required")
}
pubsOnWallet, err := publishesCount(*claims)
if err != nil {
return errors.Prefix("error counting claims: ", err)
}
pubsOnDB := 0
for _, sv := range s.syncedVideos {
if sv.Published {
pubsOnDB++
}
}
if pubsOnWallet > pubsOnDB {
return errors.Err("not all published videos are in the database")
}
if pubsOnWallet < pubsOnDB {
SendInfoToSlack("We're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID)
}
err = s.walletSetup()
if err != nil {
return errors.Prefix("Initial wallet setup failed! Manual Intervention is required.", err)
@ -371,10 +441,10 @@ func (s *Sync) doSync() error {
for i := 0; i < s.ConcurrentVideos; i++ {
s.grp.Add(1)
go func() {
go func(i int) {
defer s.grp.Done()
s.startWorker(i)
}()
}(i)
}
if s.LbryChannelName == "@UCBerkeley" {
@ -469,8 +539,8 @@ func (s *Sync) startWorker(workerNum int) {
}
SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg)
}
s.AppendSyncedVideo(v.ID(), false, err.Error())
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error())
s.AppendSyncedVideo(v.ID(), false, err.Error(), "")
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error(), v.Size())
if err != nil {
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
}
@ -627,9 +697,9 @@ func (s *Sync) processVideo(v video) (err error) {
log.Println(v.ID() + " took " + time.Since(start).String())
}(time.Now())
s.syncedVideosMux.Lock()
s.syncedVideosMux.RLock()
sv, ok := s.syncedVideos[v.ID()]
s.syncedVideosMux.Unlock()
s.syncedVideosMux.RUnlock()
alreadyPublished := ok && sv.Published
neverRetryFailures := []string{
@ -667,15 +737,16 @@ func (s *Sync) processVideo(v video) (err error) {
if err != nil {
return err
}
summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.MaxVideoSize)
summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.MaxVideoSize, s.claimNames, s.syncedVideosMux)
if err != nil {
return err
}
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "")
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "", v.Size())
if err != nil {
return err
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
}
s.AppendSyncedVideo(v.ID(), true, "")
s.AppendSyncedVideo(v.ID(), true, "", summary.ClaimName)
return nil
}