Better tracking (Size and failure reason) #35
8 changed files with 239 additions and 119 deletions
|
@ -456,3 +456,15 @@ func (d *Client) NumClaimsInChannel(url string) (uint64, error) {
|
|||
}
|
||||
return channel.ClaimsInChannel, nil
|
||||
}
|
||||
|
||||
func (d *Client) ClaimListMine() (*ClaimListMineResponse, error) {
|
||||
response := new(ClaimListMineResponse)
|
||||
err := d.call(response, "claim_list_mine", map[string]interface{}{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if response == nil {
|
||||
return nil, errors.Err("no response")
|
||||
}
|
||||
|
||||
return response, nil
|
||||
}
|
||||
|
|
|
@ -31,25 +31,32 @@ type Support struct {
|
|||
}
|
||||
|
||||
type Claim struct {
|
||||
Address string `json:"address"`
|
||||
Amount decimal.Decimal `json:"amount"`
|
||||
ClaimID string `json:"claim_id"`
|
||||
ClaimSequence int `json:"claim_sequence"`
|
||||
DecodedClaim bool `json:"decoded_claim"`
|
||||
Depth int `json:"depth"`
|
||||
EffectiveAmount decimal.Decimal `json:"effective_amount"`
|
||||
Height int `json:"height"`
|
||||
Hex string `json:"hex"`
|
||||
Name string `json:"name"`
|
||||
Nout int `json:"nout"`
|
||||
Supports []Support `json:"supports"`
|
||||
Txid string `json:"txid"`
|
||||
ValidAtHeight int `json:"valid_at_height"`
|
||||
Value lbryschema.Claim `json:"value"`
|
||||
Error *string `json:"error,omitempty"`
|
||||
ChannelName *string `json:"channel_name,omitempty"`
|
||||
HasSignature *bool `json:"has_signature,omitempty"`
|
||||
SignatureIsValid *bool `json:"signature_is_valid,omitempty"`
|
||||
Address string `json:"address"`
|
||||
Amount decimal.Decimal `json:"amount"`
|
||||
BlocksToExpiration int `json:"blocks_to_expiration"`
|
||||
Category string `json:"category"`
|
||||
ClaimID string `json:"claim_id"`
|
||||
ClaimSequence int `json:"claim_sequence"`
|
||||
Confirmations int `json:"confirmations"`
|
||||
DecodedClaim bool `json:"decoded_claim"`
|
||||
Depth int `json:"depth"`
|
||||
EffectiveAmount decimal.Decimal `json:"effective_amount"`
|
||||
ExpirationHeight int `json:"expiration_height"`
|
||||
Expired bool `json:"expired"`
|
||||
Height int `json:"height"`
|
||||
Hex string `json:"hex"`
|
||||
IsSpent bool `json:"is_spent"`
|
||||
Name string `json:"name"`
|
||||
Nout int `json:"nout"`
|
||||
PermanentUrl string `json:"permanent_url"`
|
||||
Supports []Support `json:"supports"`
|
||||
Txid string `json:"txid"`
|
||||
ValidAtHeight int `json:"valid_at_height"`
|
||||
Value lbryschema.Claim `json:"value"`
|
||||
Error *string `json:"error,omitempty"`
|
||||
ChannelName *string `json:"channel_name,omitempty"`
|
||||
HasSignature *bool `json:"has_signature,omitempty"`
|
||||
SignatureIsValid *bool `json:"signature_is_valid,omitempty"`
|
||||
}
|
||||
|
||||
type File struct {
|
||||
|
@ -234,7 +241,7 @@ type ClaimListResponse struct {
|
|||
LastTakeoverHeight int `json:"last_takeover_height"`
|
||||
SupportsWithoutClaims []Support `json:"supports_without_claims"`
|
||||
}
|
||||
|
||||
type ClaimListMineResponse []Claim
|
||||
type ClaimShowResponse Claim
|
||||
|
||||
type PeerListResponsePeer struct {
|
||||
|
|
|
@ -70,7 +70,7 @@ type apiYoutubeChannel struct {
|
|||
SyncServer null.String `json:"sync_server"`
|
||||
}
|
||||
|
||||
func (s SyncManager) fetchChannels(status string) ([]apiYoutubeChannel, error) {
|
||||
func (s *SyncManager) fetchChannels(status string) ([]apiYoutubeChannel, error) {
|
||||
endpoint := s.ApiURL + "/yt/jobs"
|
||||
res, _ := http.PostForm(endpoint, url.Values{
|
||||
"auth_token": {s.ApiToken},
|
||||
|
@ -105,35 +105,41 @@ type syncedVideo struct {
|
|||
VideoID string `json:"video_id"`
|
||||
Published bool `json:"published"`
|
||||
FailureReason string `json:"failure_reason"`
|
||||
ClaimName string `json:"claim_name"`
|
||||
}
|
||||
|
||||
func (s SyncManager) setChannelStatus(channelID string, status string) (map[string]syncedVideo, error) {
|
||||
func (s *SyncManager) setChannelStatus(channelID string, status string, failureReason string) (map[string]syncedVideo, map[string]bool, error) {
|
||||
endpoint := s.ApiURL + "/yt/channel_status"
|
||||
|
||||
if len(failureReason) > maxReasonLength {
|
||||
failureReason = failureReason[:maxReasonLength]
|
||||
}
|
||||
res, _ := http.PostForm(endpoint, url.Values{
|
||||
"channel_id": {channelID},
|
||||
"sync_server": {s.HostName},
|
||||
"auth_token": {s.ApiToken},
|
||||
"sync_status": {status},
|
||||
"channel_id": {channelID},
|
||||
"sync_server": {s.HostName},
|
||||
"auth_token": {s.ApiToken},
|
||||
"sync_status": {status},
|
||||
"failure_reason": {failureReason},
|
||||
})
|
||||
defer res.Body.Close()
|
||||
body, _ := ioutil.ReadAll(res.Body)
|
||||
var response apiChannelStatusResponse
|
||||
err := json.Unmarshal(body, &response)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
if !response.Error.IsNull() {
|
||||
return nil, errors.Err(response.Error.String)
|
||||
return nil, nil, errors.Err(response.Error.String)
|
||||
}
|
||||
if response.Data != nil {
|
||||
svs := make(map[string]syncedVideo)
|
||||
claimNames := make(map[string]bool)
|
||||
for _, v := range response.Data {
|
||||
svs[v.VideoID] = v
|
||||
claimNames[v.ClaimName] = v.Published
|
||||
}
|
||||
return svs, nil
|
||||
return svs, claimNames, nil
|
||||
}
|
||||
return nil, errors.Err("invalid API response. Status code: %d", res.StatusCode)
|
||||
return nil, nil, errors.Err("invalid API response. Status code: %d", res.StatusCode)
|
||||
}
|
||||
|
||||
const (
|
||||
|
@ -141,9 +147,11 @@ const (
|
|||
VideoStatusFailed = "failed"
|
||||
)
|
||||
|
||||
func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string) error {
|
||||
func (s *SyncManager) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string, size *int64) error {
|
||||
endpoint := s.ApiURL + "/yt/video_status"
|
||||
|
||||
if len(failureReason) > maxReasonLength {
|
||||
failureReason = failureReason[:maxReasonLength]
|
||||
}
|
||||
vals := url.Values{
|
||||
"youtube_channel_id": {channelID},
|
||||
"video_id": {videoID},
|
||||
|
@ -157,12 +165,11 @@ func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status st
|
|||
vals.Add("published_at", strconv.FormatInt(time.Now().Unix(), 10))
|
||||
vals.Add("claim_id", claimID)
|
||||
vals.Add("claim_name", claimName)
|
||||
if size != nil {
|
||||
vals.Add("size", strconv.FormatInt(*size, 10))
|
||||
}
|
||||
}
|
||||
if failureReason != "" {
|
||||
maxReasonLength := 500
|
||||
if len(failureReason) > maxReasonLength {
|
||||
failureReason = failureReason[:500]
|
||||
}
|
||||
vals.Add("failure_reason", failureReason)
|
||||
}
|
||||
res, _ := http.PostForm(endpoint, vals)
|
||||
|
@ -186,7 +193,7 @@ func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status st
|
|||
return errors.Err("invalid API response. Status code: %d", res.StatusCode)
|
||||
}
|
||||
|
||||
func (s SyncManager) Start() error {
|
||||
func (s *SyncManager) Start() error {
|
||||
syncCount := 0
|
||||
for {
|
||||
err := s.checkUsedSpace()
|
||||
|
@ -220,7 +227,7 @@ func (s SyncManager) Start() error {
|
|||
ConcurrentVideos: s.ConcurrentVideos,
|
||||
TakeOverExistingChannel: s.TakeOverExistingChannel,
|
||||
Refill: s.Refill,
|
||||
Manager: &s,
|
||||
Manager: s,
|
||||
LbrycrdString: s.LbrycrdString,
|
||||
AwsS3ID: s.AwsS3ID,
|
||||
AwsS3Secret: s.AwsS3Secret,
|
||||
|
@ -255,7 +262,7 @@ func (s SyncManager) Start() error {
|
|||
ConcurrentVideos: s.ConcurrentVideos,
|
||||
TakeOverExistingChannel: s.TakeOverExistingChannel,
|
||||
Refill: s.Refill,
|
||||
Manager: &s,
|
||||
Manager: s,
|
||||
LbrycrdString: s.LbrycrdString,
|
||||
AwsS3ID: s.AwsS3ID,
|
||||
AwsS3Secret: s.AwsS3Secret,
|
||||
|
@ -305,11 +312,11 @@ func (s SyncManager) Start() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s SyncManager) isWorthProcessing(channel apiYoutubeChannel) bool {
|
||||
func (s *SyncManager) isWorthProcessing(channel apiYoutubeChannel) bool {
|
||||
return channel.TotalVideos > 0 && (channel.SyncServer.IsNull() || channel.SyncServer.String == s.HostName)
|
||||
}
|
||||
|
||||
func (s SyncManager) checkUsedSpace() error {
|
||||
func (s *SyncManager) checkUsedSpace() error {
|
||||
usedPctile, err := GetUsedSpace(s.BlobsDir)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -45,9 +45,9 @@ func (s *Sync) walletSetup() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
s.syncedVideosMux.Lock()
|
||||
s.syncedVideosMux.RLock()
|
||||
numPublished := len(s.syncedVideos) //should we only count published videos? Credits are allocated even for failed ones...
|
||||
s.syncedVideosMux.Unlock()
|
||||
s.syncedVideosMux.RUnlock()
|
||||
log.Debugf("We already published %d videos", numPublished)
|
||||
|
||||
if numOnSource-numPublished > s.Manager.VideosLimit {
|
||||
|
|
|
@ -49,22 +49,22 @@ func getClaimNameFromTitle(title string, attempt int) string {
|
|||
return name + suffix
|
||||
}
|
||||
|
||||
var publishedNamesMutex sync.RWMutex
|
||||
var publishedNames = map[string]bool{}
|
||||
|
||||
func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions) (*SyncSummary, error) {
|
||||
func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) {
|
||||
attempt := 0
|
||||
for {
|
||||
attempt++
|
||||
name := getClaimNameFromTitle(title, attempt)
|
||||
|
||||
publishedNamesMutex.RLock()
|
||||
_, exists := publishedNames[name]
|
||||
publishedNamesMutex.RUnlock()
|
||||
syncedVideosMux.Lock()
|
||||
_, exists := claimNames[name]
|
||||
if exists {
|
||||
log.Printf("name exists, retrying (%d attempts so far)\n", attempt)
|
||||
log.Printf("name exists, retrying (%d attempts so far)", attempt)
|
||||
syncedVideosMux.Unlock()
|
||||
continue
|
||||
}
|
||||
claimNames[name] = false
|
||||
syncedVideosMux.Unlock()
|
||||
|
||||
//if for some reasons the title can't be converted in a valid claim name (too short or not latin) then we use a hash
|
||||
if len(name) < 2 {
|
||||
hasher := md5.New()
|
||||
|
@ -74,13 +74,13 @@ func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string
|
|||
|
||||
response, err := daemon.Publish(name, filename, amount, options)
|
||||
if err == nil || strings.Contains(err.Error(), "failed: Multiple claims (") {
|
||||
publishedNamesMutex.Lock()
|
||||
publishedNames[name] = true
|
||||
publishedNamesMutex.Unlock()
|
||||
syncedVideosMux.Lock()
|
||||
claimNames[name] = true
|
||||
syncedVideosMux.Unlock()
|
||||
if err == nil {
|
||||
return &SyncSummary{ClaimID: response.ClaimID, ClaimName: name}, nil
|
||||
} else {
|
||||
log.Printf("name exists, retrying (%d attempts so far)\n", attempt)
|
||||
log.Printf("name exists, retrying (%d attempts so far)", attempt)
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -8,6 +8,8 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"sync"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws/credentials"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
||||
|
@ -20,17 +22,19 @@ import (
|
|||
)
|
||||
|
||||
type ucbVideo struct {
|
||||
id string
|
||||
title string
|
||||
channel string
|
||||
description string
|
||||
publishedAt time.Time
|
||||
dir string
|
||||
id string
|
||||
title string
|
||||
channel string
|
||||
description string
|
||||
publishedAt time.Time
|
||||
dir string
|
||||
claimNames map[string]bool
|
||||
syncedVideosMux *sync.RWMutex
|
||||
}
|
||||
|
||||
func NewUCBVideo(id, title, channel, description, publishedAt, dir string) ucbVideo {
|
||||
func NewUCBVideo(id, title, channel, description, publishedAt, dir string) *ucbVideo {
|
||||
p, _ := time.Parse(time.RFC3339Nano, publishedAt) // ignore parse errors
|
||||
return ucbVideo{
|
||||
return &ucbVideo{
|
||||
id: id,
|
||||
title: title,
|
||||
description: description,
|
||||
|
@ -40,19 +44,19 @@ func NewUCBVideo(id, title, channel, description, publishedAt, dir string) ucbVi
|
|||
}
|
||||
}
|
||||
|
||||
there is only one mutex for the whole map, the pattern is the same you used locally in the function where a name is generated for the claim. there is only one mutex for the whole map, the pattern is the same you used locally in the function where a name is generated for the claim.
I however agree with your comment about passing down the mutex, I'm going to correct that.
|
||||
func (v ucbVideo) ID() string {
|
||||
func (v *ucbVideo) ID() string {
|
||||
return v.id
|
||||
}
|
||||
|
||||
func (v ucbVideo) PlaylistPosition() int {
|
||||
func (v *ucbVideo) PlaylistPosition() int {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (v ucbVideo) IDAndNum() string {
|
||||
func (v *ucbVideo) IDAndNum() string {
|
||||
return v.ID() + " (?)"
|
||||
}
|
||||
|
||||
func (v ucbVideo) PublishedAt() time.Time {
|
||||
func (v *ucbVideo) PublishedAt() time.Time {
|
||||
return v.publishedAt
|
||||
//r := regexp.MustCompile(`(\d\d\d\d)-(\d\d)-(\d\d)`)
|
||||
//matches := r.FindStringSubmatch(v.title)
|
||||
|
@ -65,11 +69,11 @@ func (v ucbVideo) PublishedAt() time.Time {
|
|||
//return time.Now()
|
||||
}
|
||||
|
||||
func (v ucbVideo) getFilename() string {
|
||||
func (v *ucbVideo) getFilename() string {
|
||||
return v.dir + "/" + v.id + ".mp4"
|
||||
}
|
||||
|
||||
func (v ucbVideo) getClaimName(attempt int) string {
|
||||
func (v *ucbVideo) getClaimName(attempt int) string {
|
||||
reg := regexp.MustCompile(`[^a-zA-Z0-9]+`)
|
||||
suffix := ""
|
||||
if attempt > 1 {
|
||||
|
@ -98,7 +102,7 @@ func (v ucbVideo) getClaimName(attempt int) string {
|
|||
return name + suffix
|
||||
}
|
||||
|
||||
func (v ucbVideo) getAbbrevDescription() string {
|
||||
func (v *ucbVideo) getAbbrevDescription() string {
|
||||
maxLines := 10
|
||||
description := strings.TrimSpace(v.description)
|
||||
if strings.Count(description, "\n") < maxLines {
|
||||
|
@ -107,7 +111,7 @@ func (v ucbVideo) getAbbrevDescription() string {
|
|||
return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..."
|
||||
}
|
||||
|
||||
func (v ucbVideo) download() error {
|
||||
func (v *ucbVideo) download() error {
|
||||
videoPath := v.getFilename()
|
||||
|
||||
_, err := os.Stat(videoPath)
|
||||
|
@ -146,7 +150,7 @@ func (v ucbVideo) download() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (v ucbVideo) saveThumbnail() error {
|
||||
func (v *ucbVideo) saveThumbnail() error {
|
||||
resp, err := http.Get("https://s3.us-east-2.amazonaws.com/lbry-niko2/thumbnails/" + v.id)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -170,7 +174,7 @@ func (v ucbVideo) saveThumbnail() error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (v ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) {
|
||||
func (v *ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) {
|
||||
options := jsonrpc.PublishOptions{
|
||||
Title: &v.title,
|
||||
Author: strPtr("UC Berkeley"),
|
||||
|
@ -183,10 +187,16 @@ func (v ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount fl
|
|||
ChangeAddress: &claimAddress,
|
||||
}
|
||||
|
||||
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options)
|
||||
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, v.claimNames, v.syncedVideosMux)
|
||||
}
|
||||
|
||||
func (v ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) {
|
||||
func (v *ucbVideo) Size() *int64 {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) {
|
||||
v.claimNames = claimNames
|
||||
v.syncedVideosMux = syncedVideosMux
|
||||
//download and thumbnail can be done in parallel
|
||||
err := v.download()
|
||||
if err != nil {
|
||||
|
|
|
@ -11,6 +11,8 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"sync"
|
||||
|
||||
"github.com/lbryio/lbry.go/errors"
|
||||
"github.com/lbryio/lbry.go/jsonrpc"
|
||||
|
||||
|
@ -25,13 +27,16 @@ type YoutubeVideo struct {
|
|||
title string
|
||||
description string
|
||||
playlistPosition int64
|
||||
size *int64
|
||||
publishedAt time.Time
|
||||
dir string
|
||||
claimNames map[string]bool
|
||||
syncedVideosMux *sync.RWMutex
|
||||
}
|
||||
|
||||
func NewYoutubeVideo(directory string, snippet *youtube.PlaylistItemSnippet) YoutubeVideo {
|
||||
func NewYoutubeVideo(directory string, snippet *youtube.PlaylistItemSnippet) *YoutubeVideo {
|
||||
publishedAt, _ := time.Parse(time.RFC3339Nano, snippet.PublishedAt) // ignore parse errors
|
||||
return YoutubeVideo{
|
||||
return &YoutubeVideo{
|
||||
id: snippet.ResourceId.VideoId,
|
||||
title: snippet.Title,
|
||||
description: snippet.Description,
|
||||
|
@ -42,23 +47,23 @@ func NewYoutubeVideo(directory string, snippet *youtube.PlaylistItemSnippet) You
|
|||
}
|
||||
}
|
||||
|
||||
func (v YoutubeVideo) ID() string {
|
||||
func (v *YoutubeVideo) ID() string {
|
||||
return v.id
|
||||
}
|
||||
|
||||
func (v YoutubeVideo) PlaylistPosition() int {
|
||||
func (v *YoutubeVideo) PlaylistPosition() int {
|
||||
return int(v.playlistPosition)
|
||||
}
|
||||
|
||||
func (v YoutubeVideo) IDAndNum() string {
|
||||
func (v *YoutubeVideo) IDAndNum() string {
|
||||
return v.ID() + " (" + strconv.Itoa(int(v.playlistPosition)) + " in channel)"
|
||||
}
|
||||
|
||||
func (v YoutubeVideo) PublishedAt() time.Time {
|
||||
func (v *YoutubeVideo) PublishedAt() time.Time {
|
||||
return v.publishedAt
|
||||
}
|
||||
|
||||
func (v YoutubeVideo) getFilename() string {
|
||||
func (v *YoutubeVideo) getFilename() string {
|
||||
maxLen := 30
|
||||
reg := regexp.MustCompile(`[^a-zA-Z0-9]+`)
|
||||
|
||||
|
@ -85,7 +90,7 @@ func (v YoutubeVideo) getFilename() string {
|
|||
return v.videoDir() + "/" + name + ".mp4"
|
||||
}
|
||||
|
||||
func (v YoutubeVideo) getAbbrevDescription() string {
|
||||
func (v *YoutubeVideo) getAbbrevDescription() string {
|
||||
maxLines := 10
|
||||
description := strings.TrimSpace(v.description)
|
||||
if strings.Count(description, "\n") < maxLines {
|
||||
|
@ -94,7 +99,7 @@ func (v YoutubeVideo) getAbbrevDescription() string {
|
|||
return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..."
|
||||
}
|
||||
|
||||
func (v YoutubeVideo) download() error {
|
||||
func (v *YoutubeVideo) download() error {
|
||||
videoPath := v.getFilename()
|
||||
|
||||
err := os.Mkdir(v.videoDir(), 0750)
|
||||
|
@ -118,20 +123,19 @@ func (v YoutubeVideo) download() error {
|
|||
|
||||
var downloadedFile *os.File
|
||||
downloadedFile, err = os.Create(videoPath)
|
||||
defer downloadedFile.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer downloadedFile.Close()
|
||||
|
||||
return videoInfo.Download(videoInfo.Formats.Best(ytdl.FormatAudioEncodingKey)[0], downloadedFile)
|
||||
return videoInfo.Download(videoInfo.Formats.Best(ytdl.FormatAudioEncodingKey)[1], downloadedFile)
|
||||
}
|
||||
|
||||
func (v YoutubeVideo) videoDir() string {
|
||||
func (v *YoutubeVideo) videoDir() string {
|
||||
return v.dir + "/" + v.id
|
||||
}
|
||||
|
||||
func (v YoutubeVideo) delete() error {
|
||||
func (v *YoutubeVideo) delete() error {
|
||||
videoPath := v.getFilename()
|
||||
err := os.Remove(videoPath)
|
||||
if err != nil {
|
||||
|
@ -142,7 +146,7 @@ func (v YoutubeVideo) delete() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (v YoutubeVideo) triggerThumbnailSave() error {
|
||||
func (v *YoutubeVideo) triggerThumbnailSave() error {
|
||||
client := &http.Client{Timeout: 30 * time.Second}
|
||||
|
||||
params, err := json.Marshal(map[string]string{"videoid": v.id})
|
||||
|
@ -167,17 +171,17 @@ func (v YoutubeVideo) triggerThumbnailSave() error {
|
|||
}
|
||||
|
||||
var decoded struct {
|
||||
error int `json:"error"`
|
||||
url string `json:"url,omitempty"`
|
||||
message string `json:"message,omitempty"`
|
||||
Error int `json:"error"`
|
||||
Url string `json:"url,omitempty"`
|
||||
Message string `json:"message,omitempty"`
|
||||
}
|
||||
err = json.Unmarshal(contents, &decoded)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if decoded.error != 0 {
|
||||
return errors.Err("error creating thumbnail: " + decoded.message)
|
||||
if decoded.Error != 0 {
|
||||
return errors.Err("error creating thumbnail: " + decoded.Message)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -185,7 +189,7 @@ func (v YoutubeVideo) triggerThumbnailSave() error {
|
|||
|
||||
func strPtr(s string) *string { return &s }
|
||||
|
||||
func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) {
|
||||
func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) {
|
||||
if channelID == "" {
|
||||
return nil, errors.Err("a claim_id for the channel wasn't provided") //TODO: this is probably not needed?
|
||||
}
|
||||
|
@ -200,10 +204,16 @@ func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amoun
|
|||
ChangeAddress: &claimAddress,
|
||||
ChannelID: &channelID,
|
||||
}
|
||||
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options)
|
||||
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, v.claimNames, v.syncedVideosMux)
|
||||
}
|
||||
|
||||
func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) {
|
||||
func (v *YoutubeVideo) Size() *int64 {
|
||||
return v.size
|
||||
}
|
||||
|
||||
func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) {
|
||||
v.claimNames = claimNames
|
||||
v.syncedVideosMux = syncedVideosMux
|
||||
//download and thumbnail can be done in parallel
|
||||
err := v.download()
|
||||
if err != nil {
|
||||
|
@ -215,7 +225,10 @@ func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount f
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if fi.Size() > int64(maxVideoSize)*1024*1024 {
|
||||
videoSize := fi.Size()
|
||||
v.size = &videoSize
|
||||
|
||||
if videoSize > int64(maxVideoSize)*1024*1024 {
|
||||
//delete the video and ignore the error
|
||||
_ = v.delete()
|
||||
return nil, errors.Err("the video is too big to sync, skipping for now")
|
||||
|
|
109
ytsync/ytsync.go
109
ytsync/ytsync.go
|
@ -38,14 +38,16 @@ import (
|
|||
const (
|
||||
channelClaimAmount = 0.01
|
||||
publishAmount = 0.01
|
||||
maxReasonLength = 500
|
||||
)
|
||||
|
||||
type video interface {
|
||||
Size() *int64
|
||||
ID() string
|
||||
IDAndNum() string
|
||||
PlaylistPosition() int
|
||||
PublishedAt() time.Time
|
||||
Sync(*jsonrpc.Client, string, float64, string, int) (*sources.SyncSummary, error)
|
||||
Sync(*jsonrpc.Client, string, float64, string, int, map[string]bool, *sync.RWMutex) (*sources.SyncSummary, error)
|
||||
}
|
||||
|
||||
// sorting videos
|
||||
|
@ -76,8 +78,9 @@ type Sync struct {
|
|||
claimAddress string
|
||||
videoDirectory string
|
||||
db *redisdb.DB
|
||||
syncedVideosMux *sync.RWMutex
|
||||
syncedVideos map[string]syncedVideo
|
||||
syncedVideosMux *sync.Mutex
|
||||
claimNames map[string]bool
|
||||
grp *stop.Group
|
||||
lbryChannelID string
|
||||
|
||||
|
@ -85,7 +88,7 @@ type Sync struct {
|
|||
queue chan video
|
||||
}
|
||||
|
||||
func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string) {
|
||||
func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string, claimName string) {
|
||||
s.syncedVideosMux.Lock()
|
||||
defer s.syncedVideosMux.Unlock()
|
||||
s.syncedVideos[videoID] = syncedVideo{
|
||||
|
@ -93,6 +96,9 @@ func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason s
|
|||
Published: published,
|
||||
FailureReason: failureReason,
|
||||
}
|
||||
if claimName != "" {
|
||||
s.claimNames[claimName] = true
|
||||
}
|
||||
}
|
||||
|
||||
// SendErrorToSlack Sends an error message to the default channel and to the process log.
|
||||
|
@ -223,7 +229,7 @@ func (s *Sync) FullCycle() (e error) {
|
|||
if s.YoutubeChannelID == "" {
|
||||
return errors.Err("channel ID not provided")
|
||||
}
|
||||
s.syncedVideosMux = &sync.Mutex{}
|
||||
s.syncedVideosMux = &sync.RWMutex{}
|
||||
s.walletMux = &sync.Mutex{}
|
||||
s.db = redisdb.New()
|
||||
s.grp = stop.New()
|
||||
|
@ -236,12 +242,13 @@ func (s *Sync) FullCycle() (e error) {
|
|||
log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)")
|
||||
s.grp.Stop()
|
||||
}()
|
||||
syncedVideos, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing)
|
||||
syncedVideos, claimNames, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing, "")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.syncedVideosMux.Lock()
|
||||
s.syncedVideos = syncedVideos
|
||||
s.claimNames = claimNames
|
||||
s.syncedVideosMux.Unlock()
|
||||
|
||||
defer s.updateChannelStatus(&e)
|
||||
|
@ -298,14 +305,15 @@ func (s *Sync) updateChannelStatus(e *error) {
|
|||
if util.SubstringInSlice((*e).Error(), noFailConditions) {
|
||||
return
|
||||
}
|
||||
_, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed)
|
||||
failureReason := (*e).Error()
|
||||
_, _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed, failureReason)
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName)
|
||||
err = errors.Prefix(msg, err)
|
||||
*e = errors.Prefix(err.Error(), *e)
|
||||
}
|
||||
} else if !s.IsInterrupted() {
|
||||
_, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced)
|
||||
_, _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced, "")
|
||||
if err != nil {
|
||||
*e = err
|
||||
}
|
||||
|
@ -343,7 +351,7 @@ func (s *Sync) stopAndUploadWallet(e *error) {
|
|||
err := s.uploadWallet()
|
||||
if err != nil {
|
||||
if *e == nil {
|
||||
e = &err
|
||||
e = &err //not 100% sure
|
||||
return
|
||||
} else {
|
||||
*e = errors.Prefix("failure uploading wallet: ", *e)
|
||||
|
@ -357,9 +365,71 @@ func logShutdownError(shutdownErr error) {
|
|||
SendErrorToSlack("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR")
|
||||
}
|
||||
|
||||
func hasDupes(claims []jsonrpc.Claim) (bool, error) {
|
||||
videoIDs := make(map[string]interface{})
|
||||
for _, c := range claims {
|
||||
if !util.InSlice(c.Category, []string{"claim", "update"}) || c.Value.Stream == nil {
|
||||
continue
|
||||
}
|
||||
if c.Value.Stream.Metadata == nil || c.Value.Stream.Metadata.Thumbnail == nil {
|
||||
return false, errors.Err("something is wrong with the this claim: %s", c.ClaimID)
|
||||
}
|
||||
tn := *c.Value.Stream.Metadata.Thumbnail
|
||||
videoID := tn[:strings.LastIndex(tn, "/")+1]
|
||||
_, ok := videoIDs[videoID]
|
||||
if !ok {
|
||||
videoIDs[videoID] = nil
|
||||
continue
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
//publishesCount counts the amount of videos published so far
|
||||
func publishesCount(claims []jsonrpc.Claim) (int, error) {
|
||||
count := 0
|
||||
for _, c := range claims {
|
||||
if !util.InSlice(c.Category, []string{"claim", "update"}) || c.Value.Stream == nil {
|
||||
continue
|
||||
}
|
||||
if c.Value.Stream.Metadata == nil || c.Value.Stream.Metadata.Thumbnail == nil {
|
||||
return count, errors.Err("something is wrong with the this claim: %s", c.ClaimID)
|
||||
}
|
||||
count++
|
||||
}
|
||||
return count, nil
|
||||
}
|
||||
|
||||
func (s *Sync) doSync() error {
|
||||
var err error
|
||||
|
||||
claims, err := s.daemon.ClaimListMine()
|
||||
if err != nil {
|
||||
return errors.Prefix("cannot list claims: ", err)
|
||||
}
|
||||
hasDupes, err := hasDupes(*claims)
|
||||
if err != nil {
|
||||
return errors.Prefix("error checking for duplicates: ", err)
|
||||
}
|
||||
if hasDupes {
|
||||
return errors.Err("channel has duplicates! Manual fix required")
|
||||
}
|
||||
pubsOnWallet, err := publishesCount(*claims)
|
||||
if err != nil {
|
||||
return errors.Prefix("error counting claims: ", err)
|
||||
}
|
||||
pubsOnDB := 0
|
||||
for _, sv := range s.syncedVideos {
|
||||
if sv.Published {
|
||||
pubsOnDB++
|
||||
}
|
||||
}
|
||||
if pubsOnWallet > pubsOnDB {
|
||||
return errors.Err("not all published videos are in the database")
|
||||
}
|
||||
if pubsOnWallet < pubsOnDB {
|
||||
SendInfoToSlack("We're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID)
|
||||
}
|
||||
err = s.walletSetup()
|
||||
if err != nil {
|
||||
return errors.Prefix("Initial wallet setup failed! Manual Intervention is required.", err)
|
||||
|
@ -371,10 +441,10 @@ func (s *Sync) doSync() error {
|
|||
|
||||
for i := 0; i < s.ConcurrentVideos; i++ {
|
||||
s.grp.Add(1)
|
||||
go func() {
|
||||
go func(i int) {
|
||||
defer s.grp.Done()
|
||||
why does this function return a pointer to an error? why does this function return a pointer to an error?
because the function is run as a deferred and needs to read/write to the error being returned by the original function. Is this a bad pattern? because the function is run as a deferred and needs to read/write to the error being returned by the original function. Is this a bad pattern?
well actually to be correct it's not returning a pointer to an error, it's accepting a pointer to an error so that I can change it from within. well actually to be correct it's not returning a pointer to an error, it's accepting a pointer to an error so that I can change it from within.
|
||||
s.startWorker(i)
|
||||
}()
|
||||
}(i)
|
||||
}
|
||||
|
||||
if s.LbryChannelName == "@UCBerkeley" {
|
||||
|
@ -469,8 +539,8 @@ func (s *Sync) startWorker(workerNum int) {
|
|||
}
|
||||
SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg)
|
||||
}
|
||||
s.AppendSyncedVideo(v.ID(), false, err.Error())
|
||||
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error())
|
||||
s.AppendSyncedVideo(v.ID(), false, err.Error(), "")
|
||||
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error(), v.Size())
|
||||
if err != nil {
|
||||
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
|
||||
}
|
||||
|
@ -627,9 +697,9 @@ func (s *Sync) processVideo(v video) (err error) {
|
|||
log.Println(v.ID() + " took " + time.Since(start).String())
|
||||
}(time.Now())
|
||||
|
||||
s.syncedVideosMux.Lock()
|
||||
s.syncedVideosMux.RLock()
|
||||
sv, ok := s.syncedVideos[v.ID()]
|
||||
s.syncedVideosMux.Unlock()
|
||||
s.syncedVideosMux.RUnlock()
|
||||
alreadyPublished := ok && sv.Published
|
||||
|
||||
neverRetryFailures := []string{
|
||||
|
@ -667,15 +737,16 @@ func (s *Sync) processVideo(v video) (err error) {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.MaxVideoSize)
|
||||
summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.MaxVideoSize, s.claimNames, s.syncedVideosMux)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "")
|
||||
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "", v.Size())
|
||||
if err != nil {
|
||||
return err
|
||||
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
|
||||
}
|
||||
s.AppendSyncedVideo(v.ID(), true, "")
|
||||
|
||||
s.AppendSyncedVideo(v.ID(), true, "", summary.ClaimName)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue
why are these fields here if they never get used?
and having syncedVideosMux on each video is weird. whats going on?