Better tracking (Size and failure reason) #35
8 changed files with 239 additions and 119 deletions
|
@ -456,3 +456,15 @@ func (d *Client) NumClaimsInChannel(url string) (uint64, error) {
|
||||||
}
|
}
|
||||||
return channel.ClaimsInChannel, nil
|
return channel.ClaimsInChannel, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *Client) ClaimListMine() (*ClaimListMineResponse, error) {
|
||||||
|
response := new(ClaimListMineResponse)
|
||||||
|
err := d.call(response, "claim_list_mine", map[string]interface{}{})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
} else if response == nil {
|
||||||
|
return nil, errors.Err("no response")
|
||||||
|
}
|
||||||
|
|
||||||
|
return response, nil
|
||||||
|
}
|
||||||
|
|
|
@ -33,15 +33,22 @@ type Support struct {
|
||||||
type Claim struct {
|
type Claim struct {
|
||||||
Address string `json:"address"`
|
Address string `json:"address"`
|
||||||
Amount decimal.Decimal `json:"amount"`
|
Amount decimal.Decimal `json:"amount"`
|
||||||
|
BlocksToExpiration int `json:"blocks_to_expiration"`
|
||||||
|
Category string `json:"category"`
|
||||||
ClaimID string `json:"claim_id"`
|
ClaimID string `json:"claim_id"`
|
||||||
ClaimSequence int `json:"claim_sequence"`
|
ClaimSequence int `json:"claim_sequence"`
|
||||||
|
Confirmations int `json:"confirmations"`
|
||||||
DecodedClaim bool `json:"decoded_claim"`
|
DecodedClaim bool `json:"decoded_claim"`
|
||||||
Depth int `json:"depth"`
|
Depth int `json:"depth"`
|
||||||
EffectiveAmount decimal.Decimal `json:"effective_amount"`
|
EffectiveAmount decimal.Decimal `json:"effective_amount"`
|
||||||
|
ExpirationHeight int `json:"expiration_height"`
|
||||||
|
Expired bool `json:"expired"`
|
||||||
Height int `json:"height"`
|
Height int `json:"height"`
|
||||||
Hex string `json:"hex"`
|
Hex string `json:"hex"`
|
||||||
|
IsSpent bool `json:"is_spent"`
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
Nout int `json:"nout"`
|
Nout int `json:"nout"`
|
||||||
|
PermanentUrl string `json:"permanent_url"`
|
||||||
Supports []Support `json:"supports"`
|
Supports []Support `json:"supports"`
|
||||||
Txid string `json:"txid"`
|
Txid string `json:"txid"`
|
||||||
ValidAtHeight int `json:"valid_at_height"`
|
ValidAtHeight int `json:"valid_at_height"`
|
||||||
|
@ -234,7 +241,7 @@ type ClaimListResponse struct {
|
||||||
LastTakeoverHeight int `json:"last_takeover_height"`
|
LastTakeoverHeight int `json:"last_takeover_height"`
|
||||||
SupportsWithoutClaims []Support `json:"supports_without_claims"`
|
SupportsWithoutClaims []Support `json:"supports_without_claims"`
|
||||||
}
|
}
|
||||||
|
type ClaimListMineResponse []Claim
|
||||||
type ClaimShowResponse Claim
|
type ClaimShowResponse Claim
|
||||||
|
|
||||||
type PeerListResponsePeer struct {
|
type PeerListResponsePeer struct {
|
||||||
|
|
|
@ -70,7 +70,7 @@ type apiYoutubeChannel struct {
|
||||||
SyncServer null.String `json:"sync_server"`
|
SyncServer null.String `json:"sync_server"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s SyncManager) fetchChannels(status string) ([]apiYoutubeChannel, error) {
|
func (s *SyncManager) fetchChannels(status string) ([]apiYoutubeChannel, error) {
|
||||||
endpoint := s.ApiURL + "/yt/jobs"
|
endpoint := s.ApiURL + "/yt/jobs"
|
||||||
res, _ := http.PostForm(endpoint, url.Values{
|
res, _ := http.PostForm(endpoint, url.Values{
|
||||||
"auth_token": {s.ApiToken},
|
"auth_token": {s.ApiToken},
|
||||||
|
@ -105,35 +105,41 @@ type syncedVideo struct {
|
||||||
VideoID string `json:"video_id"`
|
VideoID string `json:"video_id"`
|
||||||
Published bool `json:"published"`
|
Published bool `json:"published"`
|
||||||
FailureReason string `json:"failure_reason"`
|
FailureReason string `json:"failure_reason"`
|
||||||
|
ClaimName string `json:"claim_name"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s SyncManager) setChannelStatus(channelID string, status string) (map[string]syncedVideo, error) {
|
func (s *SyncManager) setChannelStatus(channelID string, status string, failureReason string) (map[string]syncedVideo, map[string]bool, error) {
|
||||||
endpoint := s.ApiURL + "/yt/channel_status"
|
endpoint := s.ApiURL + "/yt/channel_status"
|
||||||
|
if len(failureReason) > maxReasonLength {
|
||||||
|
failureReason = failureReason[:maxReasonLength]
|
||||||
|
}
|
||||||
res, _ := http.PostForm(endpoint, url.Values{
|
res, _ := http.PostForm(endpoint, url.Values{
|
||||||
"channel_id": {channelID},
|
"channel_id": {channelID},
|
||||||
"sync_server": {s.HostName},
|
"sync_server": {s.HostName},
|
||||||
"auth_token": {s.ApiToken},
|
"auth_token": {s.ApiToken},
|
||||||
"sync_status": {status},
|
"sync_status": {status},
|
||||||
|
"failure_reason": {failureReason},
|
||||||
})
|
})
|
||||||
defer res.Body.Close()
|
defer res.Body.Close()
|
||||||
body, _ := ioutil.ReadAll(res.Body)
|
body, _ := ioutil.ReadAll(res.Body)
|
||||||
var response apiChannelStatusResponse
|
var response apiChannelStatusResponse
|
||||||
err := json.Unmarshal(body, &response)
|
err := json.Unmarshal(body, &response)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
if !response.Error.IsNull() {
|
if !response.Error.IsNull() {
|
||||||
return nil, errors.Err(response.Error.String)
|
return nil, nil, errors.Err(response.Error.String)
|
||||||
}
|
}
|
||||||
if response.Data != nil {
|
if response.Data != nil {
|
||||||
svs := make(map[string]syncedVideo)
|
svs := make(map[string]syncedVideo)
|
||||||
|
claimNames := make(map[string]bool)
|
||||||
for _, v := range response.Data {
|
for _, v := range response.Data {
|
||||||
svs[v.VideoID] = v
|
svs[v.VideoID] = v
|
||||||
|
claimNames[v.ClaimName] = v.Published
|
||||||
}
|
}
|
||||||
return svs, nil
|
return svs, claimNames, nil
|
||||||
}
|
}
|
||||||
return nil, errors.Err("invalid API response. Status code: %d", res.StatusCode)
|
return nil, nil, errors.Err("invalid API response. Status code: %d", res.StatusCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -141,9 +147,11 @@ const (
|
||||||
VideoStatusFailed = "failed"
|
VideoStatusFailed = "failed"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string) error {
|
func (s *SyncManager) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string, size *int64) error {
|
||||||
endpoint := s.ApiURL + "/yt/video_status"
|
endpoint := s.ApiURL + "/yt/video_status"
|
||||||
|
if len(failureReason) > maxReasonLength {
|
||||||
|
failureReason = failureReason[:maxReasonLength]
|
||||||
|
}
|
||||||
vals := url.Values{
|
vals := url.Values{
|
||||||
"youtube_channel_id": {channelID},
|
"youtube_channel_id": {channelID},
|
||||||
"video_id": {videoID},
|
"video_id": {videoID},
|
||||||
|
@ -157,12 +165,11 @@ func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status st
|
||||||
vals.Add("published_at", strconv.FormatInt(time.Now().Unix(), 10))
|
vals.Add("published_at", strconv.FormatInt(time.Now().Unix(), 10))
|
||||||
vals.Add("claim_id", claimID)
|
vals.Add("claim_id", claimID)
|
||||||
vals.Add("claim_name", claimName)
|
vals.Add("claim_name", claimName)
|
||||||
|
if size != nil {
|
||||||
|
vals.Add("size", strconv.FormatInt(*size, 10))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if failureReason != "" {
|
if failureReason != "" {
|
||||||
maxReasonLength := 500
|
|
||||||
if len(failureReason) > maxReasonLength {
|
|
||||||
failureReason = failureReason[:500]
|
|
||||||
}
|
|
||||||
vals.Add("failure_reason", failureReason)
|
vals.Add("failure_reason", failureReason)
|
||||||
}
|
}
|
||||||
res, _ := http.PostForm(endpoint, vals)
|
res, _ := http.PostForm(endpoint, vals)
|
||||||
|
@ -186,7 +193,7 @@ func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status st
|
||||||
return errors.Err("invalid API response. Status code: %d", res.StatusCode)
|
return errors.Err("invalid API response. Status code: %d", res.StatusCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s SyncManager) Start() error {
|
func (s *SyncManager) Start() error {
|
||||||
syncCount := 0
|
syncCount := 0
|
||||||
for {
|
for {
|
||||||
err := s.checkUsedSpace()
|
err := s.checkUsedSpace()
|
||||||
|
@ -220,7 +227,7 @@ func (s SyncManager) Start() error {
|
||||||
ConcurrentVideos: s.ConcurrentVideos,
|
ConcurrentVideos: s.ConcurrentVideos,
|
||||||
TakeOverExistingChannel: s.TakeOverExistingChannel,
|
TakeOverExistingChannel: s.TakeOverExistingChannel,
|
||||||
Refill: s.Refill,
|
Refill: s.Refill,
|
||||||
Manager: &s,
|
Manager: s,
|
||||||
LbrycrdString: s.LbrycrdString,
|
LbrycrdString: s.LbrycrdString,
|
||||||
AwsS3ID: s.AwsS3ID,
|
AwsS3ID: s.AwsS3ID,
|
||||||
AwsS3Secret: s.AwsS3Secret,
|
AwsS3Secret: s.AwsS3Secret,
|
||||||
|
@ -255,7 +262,7 @@ func (s SyncManager) Start() error {
|
||||||
ConcurrentVideos: s.ConcurrentVideos,
|
ConcurrentVideos: s.ConcurrentVideos,
|
||||||
TakeOverExistingChannel: s.TakeOverExistingChannel,
|
TakeOverExistingChannel: s.TakeOverExistingChannel,
|
||||||
Refill: s.Refill,
|
Refill: s.Refill,
|
||||||
Manager: &s,
|
Manager: s,
|
||||||
LbrycrdString: s.LbrycrdString,
|
LbrycrdString: s.LbrycrdString,
|
||||||
AwsS3ID: s.AwsS3ID,
|
AwsS3ID: s.AwsS3ID,
|
||||||
AwsS3Secret: s.AwsS3Secret,
|
AwsS3Secret: s.AwsS3Secret,
|
||||||
|
@ -305,11 +312,11 @@ func (s SyncManager) Start() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s SyncManager) isWorthProcessing(channel apiYoutubeChannel) bool {
|
func (s *SyncManager) isWorthProcessing(channel apiYoutubeChannel) bool {
|
||||||
return channel.TotalVideos > 0 && (channel.SyncServer.IsNull() || channel.SyncServer.String == s.HostName)
|
return channel.TotalVideos > 0 && (channel.SyncServer.IsNull() || channel.SyncServer.String == s.HostName)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s SyncManager) checkUsedSpace() error {
|
func (s *SyncManager) checkUsedSpace() error {
|
||||||
usedPctile, err := GetUsedSpace(s.BlobsDir)
|
usedPctile, err := GetUsedSpace(s.BlobsDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -45,9 +45,9 @@ func (s *Sync) walletSetup() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
s.syncedVideosMux.Lock()
|
s.syncedVideosMux.RLock()
|
||||||
numPublished := len(s.syncedVideos) //should we only count published videos? Credits are allocated even for failed ones...
|
numPublished := len(s.syncedVideos) //should we only count published videos? Credits are allocated even for failed ones...
|
||||||
s.syncedVideosMux.Unlock()
|
s.syncedVideosMux.RUnlock()
|
||||||
log.Debugf("We already published %d videos", numPublished)
|
log.Debugf("We already published %d videos", numPublished)
|
||||||
|
|
||||||
if numOnSource-numPublished > s.Manager.VideosLimit {
|
if numOnSource-numPublished > s.Manager.VideosLimit {
|
||||||
|
|
|
@ -49,22 +49,22 @@ func getClaimNameFromTitle(title string, attempt int) string {
|
||||||
return name + suffix
|
return name + suffix
|
||||||
}
|
}
|
||||||
|
|
||||||
var publishedNamesMutex sync.RWMutex
|
func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) {
|
||||||
var publishedNames = map[string]bool{}
|
|
||||||
|
|
||||||
func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions) (*SyncSummary, error) {
|
|
||||||
attempt := 0
|
attempt := 0
|
||||||
for {
|
for {
|
||||||
attempt++
|
attempt++
|
||||||
name := getClaimNameFromTitle(title, attempt)
|
name := getClaimNameFromTitle(title, attempt)
|
||||||
|
|
||||||
publishedNamesMutex.RLock()
|
syncedVideosMux.Lock()
|
||||||
_, exists := publishedNames[name]
|
_, exists := claimNames[name]
|
||||||
publishedNamesMutex.RUnlock()
|
|
||||||
if exists {
|
if exists {
|
||||||
log.Printf("name exists, retrying (%d attempts so far)\n", attempt)
|
log.Printf("name exists, retrying (%d attempts so far)", attempt)
|
||||||
|
syncedVideosMux.Unlock()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
claimNames[name] = false
|
||||||
|
syncedVideosMux.Unlock()
|
||||||
|
|
||||||
//if for some reasons the title can't be converted in a valid claim name (too short or not latin) then we use a hash
|
//if for some reasons the title can't be converted in a valid claim name (too short or not latin) then we use a hash
|
||||||
if len(name) < 2 {
|
if len(name) < 2 {
|
||||||
hasher := md5.New()
|
hasher := md5.New()
|
||||||
|
@ -74,13 +74,13 @@ func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string
|
||||||
|
|
||||||
response, err := daemon.Publish(name, filename, amount, options)
|
response, err := daemon.Publish(name, filename, amount, options)
|
||||||
if err == nil || strings.Contains(err.Error(), "failed: Multiple claims (") {
|
if err == nil || strings.Contains(err.Error(), "failed: Multiple claims (") {
|
||||||
publishedNamesMutex.Lock()
|
syncedVideosMux.Lock()
|
||||||
publishedNames[name] = true
|
claimNames[name] = true
|
||||||
publishedNamesMutex.Unlock()
|
syncedVideosMux.Unlock()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return &SyncSummary{ClaimID: response.ClaimID, ClaimName: name}, nil
|
return &SyncSummary{ClaimID: response.ClaimID, ClaimName: name}, nil
|
||||||
} else {
|
} else {
|
||||||
log.Printf("name exists, retrying (%d attempts so far)\n", attempt)
|
log.Printf("name exists, retrying (%d attempts so far)", attempt)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -8,6 +8,8 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/aws/aws-sdk-go/aws/credentials"
|
"github.com/aws/aws-sdk-go/aws/credentials"
|
||||||
"github.com/aws/aws-sdk-go/service/s3"
|
"github.com/aws/aws-sdk-go/service/s3"
|
||||||
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
||||||
|
@ -26,11 +28,13 @@ type ucbVideo struct {
|
||||||
description string
|
description string
|
||||||
publishedAt time.Time
|
publishedAt time.Time
|
||||||
dir string
|
dir string
|
||||||
|
claimNames map[string]bool
|
||||||
|
syncedVideosMux *sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewUCBVideo(id, title, channel, description, publishedAt, dir string) ucbVideo {
|
func NewUCBVideo(id, title, channel, description, publishedAt, dir string) *ucbVideo {
|
||||||
p, _ := time.Parse(time.RFC3339Nano, publishedAt) // ignore parse errors
|
p, _ := time.Parse(time.RFC3339Nano, publishedAt) // ignore parse errors
|
||||||
return ucbVideo{
|
return &ucbVideo{
|
||||||
id: id,
|
id: id,
|
||||||
title: title,
|
title: title,
|
||||||
description: description,
|
description: description,
|
||||||
|
@ -40,19 +44,19 @@ func NewUCBVideo(id, title, channel, description, publishedAt, dir string) ucbVi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
there is only one mutex for the whole map, the pattern is the same you used locally in the function where a name is generated for the claim. there is only one mutex for the whole map, the pattern is the same you used locally in the function where a name is generated for the claim.
I however agree with your comment about passing down the mutex, I'm going to correct that.
|
|||||||
func (v ucbVideo) ID() string {
|
func (v *ucbVideo) ID() string {
|
||||||
return v.id
|
return v.id
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v ucbVideo) PlaylistPosition() int {
|
func (v *ucbVideo) PlaylistPosition() int {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v ucbVideo) IDAndNum() string {
|
func (v *ucbVideo) IDAndNum() string {
|
||||||
return v.ID() + " (?)"
|
return v.ID() + " (?)"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v ucbVideo) PublishedAt() time.Time {
|
func (v *ucbVideo) PublishedAt() time.Time {
|
||||||
return v.publishedAt
|
return v.publishedAt
|
||||||
//r := regexp.MustCompile(`(\d\d\d\d)-(\d\d)-(\d\d)`)
|
//r := regexp.MustCompile(`(\d\d\d\d)-(\d\d)-(\d\d)`)
|
||||||
//matches := r.FindStringSubmatch(v.title)
|
//matches := r.FindStringSubmatch(v.title)
|
||||||
|
@ -65,11 +69,11 @@ func (v ucbVideo) PublishedAt() time.Time {
|
||||||
//return time.Now()
|
//return time.Now()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v ucbVideo) getFilename() string {
|
func (v *ucbVideo) getFilename() string {
|
||||||
return v.dir + "/" + v.id + ".mp4"
|
return v.dir + "/" + v.id + ".mp4"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v ucbVideo) getClaimName(attempt int) string {
|
func (v *ucbVideo) getClaimName(attempt int) string {
|
||||||
reg := regexp.MustCompile(`[^a-zA-Z0-9]+`)
|
reg := regexp.MustCompile(`[^a-zA-Z0-9]+`)
|
||||||
suffix := ""
|
suffix := ""
|
||||||
if attempt > 1 {
|
if attempt > 1 {
|
||||||
|
@ -98,7 +102,7 @@ func (v ucbVideo) getClaimName(attempt int) string {
|
||||||
return name + suffix
|
return name + suffix
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v ucbVideo) getAbbrevDescription() string {
|
func (v *ucbVideo) getAbbrevDescription() string {
|
||||||
maxLines := 10
|
maxLines := 10
|
||||||
description := strings.TrimSpace(v.description)
|
description := strings.TrimSpace(v.description)
|
||||||
if strings.Count(description, "\n") < maxLines {
|
if strings.Count(description, "\n") < maxLines {
|
||||||
|
@ -107,7 +111,7 @@ func (v ucbVideo) getAbbrevDescription() string {
|
||||||
return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..."
|
return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..."
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v ucbVideo) download() error {
|
func (v *ucbVideo) download() error {
|
||||||
videoPath := v.getFilename()
|
videoPath := v.getFilename()
|
||||||
|
|
||||||
_, err := os.Stat(videoPath)
|
_, err := os.Stat(videoPath)
|
||||||
|
@ -146,7 +150,7 @@ func (v ucbVideo) download() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v ucbVideo) saveThumbnail() error {
|
func (v *ucbVideo) saveThumbnail() error {
|
||||||
resp, err := http.Get("https://s3.us-east-2.amazonaws.com/lbry-niko2/thumbnails/" + v.id)
|
resp, err := http.Get("https://s3.us-east-2.amazonaws.com/lbry-niko2/thumbnails/" + v.id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -170,7 +174,7 @@ func (v ucbVideo) saveThumbnail() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) {
|
func (v *ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) {
|
||||||
options := jsonrpc.PublishOptions{
|
options := jsonrpc.PublishOptions{
|
||||||
Title: &v.title,
|
Title: &v.title,
|
||||||
Author: strPtr("UC Berkeley"),
|
Author: strPtr("UC Berkeley"),
|
||||||
|
@ -183,10 +187,16 @@ func (v ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount fl
|
||||||
ChangeAddress: &claimAddress,
|
ChangeAddress: &claimAddress,
|
||||||
}
|
}
|
||||||
|
|
||||||
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options)
|
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, v.claimNames, v.syncedVideosMux)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) {
|
func (v *ucbVideo) Size() *int64 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) {
|
||||||
|
v.claimNames = claimNames
|
||||||
|
v.syncedVideosMux = syncedVideosMux
|
||||||
//download and thumbnail can be done in parallel
|
//download and thumbnail can be done in parallel
|
||||||
err := v.download()
|
err := v.download()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -11,6 +11,8 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/errors"
|
"github.com/lbryio/lbry.go/errors"
|
||||||
"github.com/lbryio/lbry.go/jsonrpc"
|
"github.com/lbryio/lbry.go/jsonrpc"
|
||||||
|
|
||||||
|
@ -25,13 +27,16 @@ type YoutubeVideo struct {
|
||||||
title string
|
title string
|
||||||
description string
|
description string
|
||||||
playlistPosition int64
|
playlistPosition int64
|
||||||
|
size *int64
|
||||||
publishedAt time.Time
|
publishedAt time.Time
|
||||||
dir string
|
dir string
|
||||||
|
claimNames map[string]bool
|
||||||
|
syncedVideosMux *sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewYoutubeVideo(directory string, snippet *youtube.PlaylistItemSnippet) YoutubeVideo {
|
func NewYoutubeVideo(directory string, snippet *youtube.PlaylistItemSnippet) *YoutubeVideo {
|
||||||
publishedAt, _ := time.Parse(time.RFC3339Nano, snippet.PublishedAt) // ignore parse errors
|
publishedAt, _ := time.Parse(time.RFC3339Nano, snippet.PublishedAt) // ignore parse errors
|
||||||
return YoutubeVideo{
|
return &YoutubeVideo{
|
||||||
id: snippet.ResourceId.VideoId,
|
id: snippet.ResourceId.VideoId,
|
||||||
title: snippet.Title,
|
title: snippet.Title,
|
||||||
description: snippet.Description,
|
description: snippet.Description,
|
||||||
|
@ -42,23 +47,23 @@ func NewYoutubeVideo(directory string, snippet *youtube.PlaylistItemSnippet) You
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v YoutubeVideo) ID() string {
|
func (v *YoutubeVideo) ID() string {
|
||||||
return v.id
|
return v.id
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v YoutubeVideo) PlaylistPosition() int {
|
func (v *YoutubeVideo) PlaylistPosition() int {
|
||||||
return int(v.playlistPosition)
|
return int(v.playlistPosition)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v YoutubeVideo) IDAndNum() string {
|
func (v *YoutubeVideo) IDAndNum() string {
|
||||||
return v.ID() + " (" + strconv.Itoa(int(v.playlistPosition)) + " in channel)"
|
return v.ID() + " (" + strconv.Itoa(int(v.playlistPosition)) + " in channel)"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v YoutubeVideo) PublishedAt() time.Time {
|
func (v *YoutubeVideo) PublishedAt() time.Time {
|
||||||
return v.publishedAt
|
return v.publishedAt
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v YoutubeVideo) getFilename() string {
|
func (v *YoutubeVideo) getFilename() string {
|
||||||
maxLen := 30
|
maxLen := 30
|
||||||
reg := regexp.MustCompile(`[^a-zA-Z0-9]+`)
|
reg := regexp.MustCompile(`[^a-zA-Z0-9]+`)
|
||||||
|
|
||||||
|
@ -85,7 +90,7 @@ func (v YoutubeVideo) getFilename() string {
|
||||||
return v.videoDir() + "/" + name + ".mp4"
|
return v.videoDir() + "/" + name + ".mp4"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v YoutubeVideo) getAbbrevDescription() string {
|
func (v *YoutubeVideo) getAbbrevDescription() string {
|
||||||
maxLines := 10
|
maxLines := 10
|
||||||
description := strings.TrimSpace(v.description)
|
description := strings.TrimSpace(v.description)
|
||||||
if strings.Count(description, "\n") < maxLines {
|
if strings.Count(description, "\n") < maxLines {
|
||||||
|
@ -94,7 +99,7 @@ func (v YoutubeVideo) getAbbrevDescription() string {
|
||||||
return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..."
|
return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..."
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v YoutubeVideo) download() error {
|
func (v *YoutubeVideo) download() error {
|
||||||
videoPath := v.getFilename()
|
videoPath := v.getFilename()
|
||||||
|
|
||||||
err := os.Mkdir(v.videoDir(), 0750)
|
err := os.Mkdir(v.videoDir(), 0750)
|
||||||
|
@ -118,20 +123,19 @@ func (v YoutubeVideo) download() error {
|
||||||
|
|
||||||
var downloadedFile *os.File
|
var downloadedFile *os.File
|
||||||
downloadedFile, err = os.Create(videoPath)
|
downloadedFile, err = os.Create(videoPath)
|
||||||
|
defer downloadedFile.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
defer downloadedFile.Close()
|
return videoInfo.Download(videoInfo.Formats.Best(ytdl.FormatAudioEncodingKey)[1], downloadedFile)
|
||||||
|
|
||||||
return videoInfo.Download(videoInfo.Formats.Best(ytdl.FormatAudioEncodingKey)[0], downloadedFile)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v YoutubeVideo) videoDir() string {
|
func (v *YoutubeVideo) videoDir() string {
|
||||||
return v.dir + "/" + v.id
|
return v.dir + "/" + v.id
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v YoutubeVideo) delete() error {
|
func (v *YoutubeVideo) delete() error {
|
||||||
videoPath := v.getFilename()
|
videoPath := v.getFilename()
|
||||||
err := os.Remove(videoPath)
|
err := os.Remove(videoPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -142,7 +146,7 @@ func (v YoutubeVideo) delete() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v YoutubeVideo) triggerThumbnailSave() error {
|
func (v *YoutubeVideo) triggerThumbnailSave() error {
|
||||||
client := &http.Client{Timeout: 30 * time.Second}
|
client := &http.Client{Timeout: 30 * time.Second}
|
||||||
|
|
||||||
params, err := json.Marshal(map[string]string{"videoid": v.id})
|
params, err := json.Marshal(map[string]string{"videoid": v.id})
|
||||||
|
@ -167,17 +171,17 @@ func (v YoutubeVideo) triggerThumbnailSave() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
var decoded struct {
|
var decoded struct {
|
||||||
error int `json:"error"`
|
Error int `json:"error"`
|
||||||
url string `json:"url,omitempty"`
|
Url string `json:"url,omitempty"`
|
||||||
message string `json:"message,omitempty"`
|
Message string `json:"message,omitempty"`
|
||||||
}
|
}
|
||||||
err = json.Unmarshal(contents, &decoded)
|
err = json.Unmarshal(contents, &decoded)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if decoded.error != 0 {
|
if decoded.Error != 0 {
|
||||||
return errors.Err("error creating thumbnail: " + decoded.message)
|
return errors.Err("error creating thumbnail: " + decoded.Message)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -185,7 +189,7 @@ func (v YoutubeVideo) triggerThumbnailSave() error {
|
||||||
|
|
||||||
func strPtr(s string) *string { return &s }
|
func strPtr(s string) *string { return &s }
|
||||||
|
|
||||||
func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) {
|
func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) {
|
||||||
if channelID == "" {
|
if channelID == "" {
|
||||||
return nil, errors.Err("a claim_id for the channel wasn't provided") //TODO: this is probably not needed?
|
return nil, errors.Err("a claim_id for the channel wasn't provided") //TODO: this is probably not needed?
|
||||||
}
|
}
|
||||||
|
@ -200,10 +204,16 @@ func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amoun
|
||||||
ChangeAddress: &claimAddress,
|
ChangeAddress: &claimAddress,
|
||||||
ChannelID: &channelID,
|
ChannelID: &channelID,
|
||||||
}
|
}
|
||||||
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options)
|
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, v.claimNames, v.syncedVideosMux)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) {
|
func (v *YoutubeVideo) Size() *int64 {
|
||||||
|
return v.size
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) {
|
||||||
|
v.claimNames = claimNames
|
||||||
|
v.syncedVideosMux = syncedVideosMux
|
||||||
//download and thumbnail can be done in parallel
|
//download and thumbnail can be done in parallel
|
||||||
err := v.download()
|
err := v.download()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -215,7 +225,10 @@ func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount f
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if fi.Size() > int64(maxVideoSize)*1024*1024 {
|
videoSize := fi.Size()
|
||||||
|
v.size = &videoSize
|
||||||
|
|
||||||
|
if videoSize > int64(maxVideoSize)*1024*1024 {
|
||||||
//delete the video and ignore the error
|
//delete the video and ignore the error
|
||||||
_ = v.delete()
|
_ = v.delete()
|
||||||
return nil, errors.Err("the video is too big to sync, skipping for now")
|
return nil, errors.Err("the video is too big to sync, skipping for now")
|
||||||
|
|
109
ytsync/ytsync.go
109
ytsync/ytsync.go
|
@ -38,14 +38,16 @@ import (
|
||||||
const (
|
const (
|
||||||
channelClaimAmount = 0.01
|
channelClaimAmount = 0.01
|
||||||
publishAmount = 0.01
|
publishAmount = 0.01
|
||||||
|
maxReasonLength = 500
|
||||||
)
|
)
|
||||||
|
|
||||||
type video interface {
|
type video interface {
|
||||||
|
Size() *int64
|
||||||
ID() string
|
ID() string
|
||||||
IDAndNum() string
|
IDAndNum() string
|
||||||
PlaylistPosition() int
|
PlaylistPosition() int
|
||||||
PublishedAt() time.Time
|
PublishedAt() time.Time
|
||||||
Sync(*jsonrpc.Client, string, float64, string, int) (*sources.SyncSummary, error)
|
Sync(*jsonrpc.Client, string, float64, string, int, map[string]bool, *sync.RWMutex) (*sources.SyncSummary, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// sorting videos
|
// sorting videos
|
||||||
|
@ -76,8 +78,9 @@ type Sync struct {
|
||||||
claimAddress string
|
claimAddress string
|
||||||
videoDirectory string
|
videoDirectory string
|
||||||
db *redisdb.DB
|
db *redisdb.DB
|
||||||
|
syncedVideosMux *sync.RWMutex
|
||||||
syncedVideos map[string]syncedVideo
|
syncedVideos map[string]syncedVideo
|
||||||
syncedVideosMux *sync.Mutex
|
claimNames map[string]bool
|
||||||
grp *stop.Group
|
grp *stop.Group
|
||||||
lbryChannelID string
|
lbryChannelID string
|
||||||
|
|
||||||
|
@ -85,7 +88,7 @@ type Sync struct {
|
||||||
queue chan video
|
queue chan video
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string) {
|
func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string, claimName string) {
|
||||||
s.syncedVideosMux.Lock()
|
s.syncedVideosMux.Lock()
|
||||||
defer s.syncedVideosMux.Unlock()
|
defer s.syncedVideosMux.Unlock()
|
||||||
s.syncedVideos[videoID] = syncedVideo{
|
s.syncedVideos[videoID] = syncedVideo{
|
||||||
|
@ -93,6 +96,9 @@ func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason s
|
||||||
Published: published,
|
Published: published,
|
||||||
FailureReason: failureReason,
|
FailureReason: failureReason,
|
||||||
}
|
}
|
||||||
|
if claimName != "" {
|
||||||
|
s.claimNames[claimName] = true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendErrorToSlack Sends an error message to the default channel and to the process log.
|
// SendErrorToSlack Sends an error message to the default channel and to the process log.
|
||||||
|
@ -223,7 +229,7 @@ func (s *Sync) FullCycle() (e error) {
|
||||||
if s.YoutubeChannelID == "" {
|
if s.YoutubeChannelID == "" {
|
||||||
return errors.Err("channel ID not provided")
|
return errors.Err("channel ID not provided")
|
||||||
}
|
}
|
||||||
s.syncedVideosMux = &sync.Mutex{}
|
s.syncedVideosMux = &sync.RWMutex{}
|
||||||
s.walletMux = &sync.Mutex{}
|
s.walletMux = &sync.Mutex{}
|
||||||
s.db = redisdb.New()
|
s.db = redisdb.New()
|
||||||
s.grp = stop.New()
|
s.grp = stop.New()
|
||||||
|
@ -236,12 +242,13 @@ func (s *Sync) FullCycle() (e error) {
|
||||||
log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)")
|
log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)")
|
||||||
s.grp.Stop()
|
s.grp.Stop()
|
||||||
}()
|
}()
|
||||||
syncedVideos, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing)
|
syncedVideos, claimNames, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
s.syncedVideosMux.Lock()
|
s.syncedVideosMux.Lock()
|
||||||
s.syncedVideos = syncedVideos
|
s.syncedVideos = syncedVideos
|
||||||
|
s.claimNames = claimNames
|
||||||
s.syncedVideosMux.Unlock()
|
s.syncedVideosMux.Unlock()
|
||||||
|
|
||||||
defer s.updateChannelStatus(&e)
|
defer s.updateChannelStatus(&e)
|
||||||
|
@ -298,14 +305,15 @@ func (s *Sync) updateChannelStatus(e *error) {
|
||||||
if util.SubstringInSlice((*e).Error(), noFailConditions) {
|
if util.SubstringInSlice((*e).Error(), noFailConditions) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
_, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed)
|
failureReason := (*e).Error()
|
||||||
|
_, _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed, failureReason)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName)
|
msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName)
|
||||||
err = errors.Prefix(msg, err)
|
err = errors.Prefix(msg, err)
|
||||||
*e = errors.Prefix(err.Error(), *e)
|
*e = errors.Prefix(err.Error(), *e)
|
||||||
}
|
}
|
||||||
} else if !s.IsInterrupted() {
|
} else if !s.IsInterrupted() {
|
||||||
_, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced)
|
_, _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
*e = err
|
*e = err
|
||||||
}
|
}
|
||||||
|
@ -343,7 +351,7 @@ func (s *Sync) stopAndUploadWallet(e *error) {
|
||||||
err := s.uploadWallet()
|
err := s.uploadWallet()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if *e == nil {
|
if *e == nil {
|
||||||
e = &err
|
e = &err //not 100% sure
|
||||||
return
|
return
|
||||||
} else {
|
} else {
|
||||||
*e = errors.Prefix("failure uploading wallet: ", *e)
|
*e = errors.Prefix("failure uploading wallet: ", *e)
|
||||||
|
@ -357,9 +365,71 @@ func logShutdownError(shutdownErr error) {
|
||||||
SendErrorToSlack("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR")
|
SendErrorToSlack("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func hasDupes(claims []jsonrpc.Claim) (bool, error) {
|
||||||
|
videoIDs := make(map[string]interface{})
|
||||||
|
for _, c := range claims {
|
||||||
|
if !util.InSlice(c.Category, []string{"claim", "update"}) || c.Value.Stream == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if c.Value.Stream.Metadata == nil || c.Value.Stream.Metadata.Thumbnail == nil {
|
||||||
|
return false, errors.Err("something is wrong with the this claim: %s", c.ClaimID)
|
||||||
|
}
|
||||||
|
tn := *c.Value.Stream.Metadata.Thumbnail
|
||||||
|
videoID := tn[:strings.LastIndex(tn, "/")+1]
|
||||||
|
_, ok := videoIDs[videoID]
|
||||||
|
if !ok {
|
||||||
|
videoIDs[videoID] = nil
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//publishesCount counts the amount of videos published so far
|
||||||
|
func publishesCount(claims []jsonrpc.Claim) (int, error) {
|
||||||
|
count := 0
|
||||||
|
for _, c := range claims {
|
||||||
|
if !util.InSlice(c.Category, []string{"claim", "update"}) || c.Value.Stream == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if c.Value.Stream.Metadata == nil || c.Value.Stream.Metadata.Thumbnail == nil {
|
||||||
|
return count, errors.Err("something is wrong with the this claim: %s", c.ClaimID)
|
||||||
|
}
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
return count, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Sync) doSync() error {
|
func (s *Sync) doSync() error {
|
||||||
var err error
|
var err error
|
||||||
|
claims, err := s.daemon.ClaimListMine()
|
||||||
|
if err != nil {
|
||||||
|
return errors.Prefix("cannot list claims: ", err)
|
||||||
|
}
|
||||||
|
hasDupes, err := hasDupes(*claims)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Prefix("error checking for duplicates: ", err)
|
||||||
|
}
|
||||||
|
if hasDupes {
|
||||||
|
return errors.Err("channel has duplicates! Manual fix required")
|
||||||
|
}
|
||||||
|
pubsOnWallet, err := publishesCount(*claims)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Prefix("error counting claims: ", err)
|
||||||
|
}
|
||||||
|
pubsOnDB := 0
|
||||||
|
for _, sv := range s.syncedVideos {
|
||||||
|
if sv.Published {
|
||||||
|
pubsOnDB++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if pubsOnWallet > pubsOnDB {
|
||||||
|
return errors.Err("not all published videos are in the database")
|
||||||
|
}
|
||||||
|
if pubsOnWallet < pubsOnDB {
|
||||||
|
SendInfoToSlack("We're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID)
|
||||||
|
}
|
||||||
err = s.walletSetup()
|
err = s.walletSetup()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Prefix("Initial wallet setup failed! Manual Intervention is required.", err)
|
return errors.Prefix("Initial wallet setup failed! Manual Intervention is required.", err)
|
||||||
|
@ -371,10 +441,10 @@ func (s *Sync) doSync() error {
|
||||||
|
|
||||||
for i := 0; i < s.ConcurrentVideos; i++ {
|
for i := 0; i < s.ConcurrentVideos; i++ {
|
||||||
s.grp.Add(1)
|
s.grp.Add(1)
|
||||||
go func() {
|
go func(i int) {
|
||||||
defer s.grp.Done()
|
defer s.grp.Done()
|
||||||
why does this function return a pointer to an error? why does this function return a pointer to an error?
because the function is run as a deferred and needs to read/write to the error being returned by the original function. Is this a bad pattern? because the function is run as a deferred and needs to read/write to the error being returned by the original function. Is this a bad pattern?
well actually to be correct it's not returning a pointer to an error, it's accepting a pointer to an error so that I can change it from within. well actually to be correct it's not returning a pointer to an error, it's accepting a pointer to an error so that I can change it from within.
|
|||||||
s.startWorker(i)
|
s.startWorker(i)
|
||||||
}()
|
}(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.LbryChannelName == "@UCBerkeley" {
|
if s.LbryChannelName == "@UCBerkeley" {
|
||||||
|
@ -469,8 +539,8 @@ func (s *Sync) startWorker(workerNum int) {
|
||||||
}
|
}
|
||||||
SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg)
|
SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg)
|
||||||
}
|
}
|
||||||
s.AppendSyncedVideo(v.ID(), false, err.Error())
|
s.AppendSyncedVideo(v.ID(), false, err.Error(), "")
|
||||||
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error())
|
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error(), v.Size())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
|
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
@ -627,9 +697,9 @@ func (s *Sync) processVideo(v video) (err error) {
|
||||||
log.Println(v.ID() + " took " + time.Since(start).String())
|
log.Println(v.ID() + " took " + time.Since(start).String())
|
||||||
}(time.Now())
|
}(time.Now())
|
||||||
|
|
||||||
s.syncedVideosMux.Lock()
|
s.syncedVideosMux.RLock()
|
||||||
sv, ok := s.syncedVideos[v.ID()]
|
sv, ok := s.syncedVideos[v.ID()]
|
||||||
s.syncedVideosMux.Unlock()
|
s.syncedVideosMux.RUnlock()
|
||||||
alreadyPublished := ok && sv.Published
|
alreadyPublished := ok && sv.Published
|
||||||
|
|
||||||
neverRetryFailures := []string{
|
neverRetryFailures := []string{
|
||||||
|
@ -667,15 +737,16 @@ func (s *Sync) processVideo(v video) (err error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.MaxVideoSize)
|
summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.MaxVideoSize, s.claimNames, s.syncedVideosMux)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "")
|
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "", v.Size())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
|
||||||
}
|
}
|
||||||
s.AppendSyncedVideo(v.ID(), true, "")
|
|
||||||
|
s.AppendSyncedVideo(v.ID(), true, "", summary.ClaimName)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue
why are these fields here if they never get used?
and having syncedVideosMux on each video is weird. whats going on?