rename selfsync to ytsync

add video marking support
This commit is contained in:
Niko Storni 2018-07-20 19:56:36 -04:00
parent f800e77e4c
commit 915862c324
No known key found for this signature in database
GPG key ID: F37FE63398800368
6 changed files with 93 additions and 39 deletions

View file

@ -29,28 +29,28 @@ var (
)
func init() {
var selfSyncCmd = &cobra.Command{
Use: "selfsync",
var ytSyncCmd = &cobra.Command{
Use: "ytsync",
Args: cobra.RangeArgs(0, 0),
Short: "Publish youtube channels into LBRY network automatically.",
Run: selfSync,
Run: ytSync,
}
selfSyncCmd.Flags().BoolVar(&stopOnError, "stop-on-error", false, "If a publish fails, stop all publishing and exit")
selfSyncCmd.Flags().IntVar(&maxTries, "max-tries", defaultMaxTries, "Number of times to try a publish that fails")
selfSyncCmd.Flags().BoolVar(&takeOverExistingChannel, "takeover-existing-channel", false, "If channel exists and we don't own it, take over the channel")
selfSyncCmd.Flags().IntVar(&limit, "limit", 0, "limit the amount of channels to sync")
selfSyncCmd.Flags().BoolVar(&skipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup")
selfSyncCmd.Flags().BoolVar(&syncUpdate, "update", false, "Update previously synced channels instead of syncing new ones (short for --status synced)")
selfSyncCmd.Flags().StringVar(&syncStatus, "status", sync.StatusQueued, "Specify which queue to pull from. Overrides --update (Default: queued)")
selfSyncCmd.Flags().StringVar(&channelID, "channelID", "", "If specified, only this channel will be synced.")
selfSyncCmd.Flags().Int64Var(&syncFrom, "after", time.Unix(0, 0).Unix(), "Specify from when to pull jobs [Unix time](Default: 0)")
selfSyncCmd.Flags().Int64Var(&syncUntil, "before", time.Now().Unix(), "Specify until when to pull jobs [Unix time](Default: current Unix time)")
selfSyncCmd.Flags().IntVar(&concurrentJobs, "concurrent-jobs", 1, "how many jobs to process concurrently (Default: 1)")
ytSyncCmd.Flags().BoolVar(&stopOnError, "stop-on-error", false, "If a publish fails, stop all publishing and exit")
ytSyncCmd.Flags().IntVar(&maxTries, "max-tries", defaultMaxTries, "Number of times to try a publish that fails")
ytSyncCmd.Flags().BoolVar(&takeOverExistingChannel, "takeover-existing-channel", false, "If channel exists and we don't own it, take over the channel")
ytSyncCmd.Flags().IntVar(&limit, "limit", 0, "limit the amount of channels to sync")
ytSyncCmd.Flags().BoolVar(&skipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup")
ytSyncCmd.Flags().BoolVar(&syncUpdate, "update", false, "Update previously synced channels instead of syncing new ones (short for --status synced)")
ytSyncCmd.Flags().StringVar(&syncStatus, "status", sync.StatusQueued, "Specify which queue to pull from. Overrides --update (Default: queued)")
ytSyncCmd.Flags().StringVar(&channelID, "channelID", "", "If specified, only this channel will be synced.")
ytSyncCmd.Flags().Int64Var(&syncFrom, "after", time.Unix(0, 0).Unix(), "Specify from when to pull jobs [Unix time](Default: 0)")
ytSyncCmd.Flags().Int64Var(&syncUntil, "before", time.Now().Unix(), "Specify until when to pull jobs [Unix time](Default: current Unix time)")
ytSyncCmd.Flags().IntVar(&concurrentJobs, "concurrent-jobs", 1, "how many jobs to process concurrently (Default: 1)")
RootCmd.AddCommand(selfSyncCmd)
RootCmd.AddCommand(ytSyncCmd)
}
func selfSync(cmd *cobra.Command, args []string) {
func ytSync(cmd *cobra.Command, args []string) {
var hostname string
slackToken := os.Getenv("SLACK_TOKEN")
if slackToken == "" {

View file

@ -117,7 +117,49 @@ func (s SyncManager) setChannelSyncStatus(channelID string, status string) error
if !response.Data.IsNull() && response.Data.String == "ok" {
return nil
}
return errors.Err("invalid API response")
return errors.Err("invalid API response. Status code: %d", res.StatusCode)
}
const (
VideoStatusPublished = "published"
VideoSStatusFailed = "failed"
)
func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, details string) error {
endpoint := s.apiURL + "/yt/track_video"
vals := url.Values{
"youtube_channel_id": {channelID},
"youtube_video_id": {videoID},
"status": {status},
"auth_token": {s.apiToken},
}
if status == VideoStatusPublished {
if claimID == "" || claimName == "" {
return errors.Err("claimID or claimName missing")
}
vals.Add("published_at", strconv.FormatInt(time.Now().Unix(), 10))
vals.Add("claim_id", claimID)
vals.Add("claim_name", claimName)
}
if details != "" {
vals.Add("details", details)
}
res, _ := http.PostForm(endpoint, vals)
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
var response apiSyncUpdateResponse
err := json.Unmarshal(body, &response)
if err != nil {
return err
}
if !response.Error.IsNull() {
return errors.Err(response.Error.String)
}
if !response.Data.IsNull() && response.Data.String == "ok" {
return nil
}
return errors.Err("invalid API response. Status code: %d", res.StatusCode)
}
func (s SyncManager) Start() error {

View file

@ -16,6 +16,11 @@ import (
var titleRegexp = regexp.MustCompile(`[^a-zA-Z0-9]+`)
type SyncSummary struct {
ClaimID string
ClaimName string
}
func getClaimNameFromTitle(title string, attempt int) string {
suffix := ""
if attempt > 1 {
@ -47,7 +52,7 @@ func getClaimNameFromTitle(title string, attempt int) string {
var publishedNamesMutex sync.RWMutex
var publishedNames = map[string]bool{}
func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions) error {
func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions) (*SyncSummary, error) {
attempt := 0
for {
attempt++
@ -67,19 +72,19 @@ func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string
name = fmt.Sprintf("%s-%d", hex.EncodeToString(hasher.Sum(nil))[:15], attempt)
}
_, err := daemon.Publish(name, filename, amount, options)
response, err := daemon.Publish(name, filename, amount, options)
if err == nil || strings.Contains(err.Error(), "failed: Multiple claims (") {
publishedNamesMutex.Lock()
publishedNames[name] = true
publishedNamesMutex.Unlock()
if err == nil {
return nil
return &SyncSummary{ClaimID: response.ClaimID, ClaimName: name}, nil
} else {
log.Printf("name exists, retrying (%d attempts so far)\n", attempt)
continue
}
} else {
return err
return nil, err
}
}
}

View file

@ -170,7 +170,7 @@ func (v ucbVideo) saveThumbnail() error {
return err
}
func (v ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) error {
func (v ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) (*SyncSummary, error) {
options := jsonrpc.PublishOptions{
Title: &v.title,
Author: strPtr("UC Berkeley"),
@ -188,11 +188,11 @@ func (v ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount fl
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options)
}
func (v ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) error {
func (v ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) (*SyncSummary, error) {
//download and thumbnail can be done in parallel
err := v.download()
if err != nil {
return errors.Prefix("download error", err)
return nil, errors.Prefix("download error", err)
}
log.Debugln("Downloaded " + v.id)
@ -202,10 +202,10 @@ func (v ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float
//}
//log.Debugln("Created thumbnail for " + v.id)
err = v.publish(daemon, claimAddress, amount, channelName)
summary, err := v.publish(daemon, claimAddress, amount, channelName)
if err != nil {
return errors.Prefix("publish error", err)
return nil, errors.Prefix("publish error", err)
}
return nil
return summary, nil
}

View file

@ -181,7 +181,7 @@ func (v YoutubeVideo) triggerThumbnailSave() error {
func strPtr(s string) *string { return &s }
func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) error {
func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) (*SyncSummary, error) {
options := jsonrpc.PublishOptions{
Title: &v.title,
Author: &v.channelTitle,
@ -199,38 +199,38 @@ func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amoun
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options)
}
func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) error {
func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) (*SyncSummary, error) {
//download and thumbnail can be done in parallel
err := v.download()
if err != nil {
return errors.Prefix("download error", err)
return nil, errors.Prefix("download error", err)
}
log.Debugln("Downloaded " + v.id)
fi, err := os.Stat(v.getFilename())
if err != nil {
return err
return nil, err
}
if fi.Size() > 2*1024*1024*1024 {
//delete the video and ignore the error
_ = v.delete()
return errors.Err("video is bigger than 2GB, skipping for now")
return nil, errors.Err("video is bigger than 2GB, skipping for now")
}
err = v.triggerThumbnailSave()
if err != nil {
return errors.Prefix("thumbnail error", err)
return nil, errors.Prefix("thumbnail error", err)
}
log.Debugln("Created thumbnail for " + v.id)
err = v.publish(daemon, claimAddress, amount, channelName)
summary, err := v.publish(daemon, claimAddress, amount, channelName)
//delete the video in all cases (and ignore the error)
_ = v.delete()
if err != nil {
return errors.Prefix("publish error", err)
return nil, errors.Prefix("publish error", err)
}
return nil
return summary, nil
}
// sorting videos

View file

@ -39,7 +39,7 @@ type video interface {
IDAndNum() string
PlaylistPosition() int
PublishedAt() time.Time
Sync(*jsonrpc.Client, string, float64, string) error
Sync(*jsonrpc.Client, string, float64, string) (*sources.SyncSummary, error)
}
// sorting videos
@ -316,6 +316,10 @@ func (s *Sync) startWorker(workerNum int) {
}
util.SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg)
}
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoSStatusFailed, "", "", err.Error())
if err != nil {
util.SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
}
}
break
}
@ -483,11 +487,14 @@ func (s *Sync) processVideo(v video) (err error) {
log.Println(v.ID() + " is old: skipping")
return nil
}
err = v.Sync(s.daemon, s.claimAddress, publishAmount, s.LbryChannelName)
summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.LbryChannelName)
if err != nil {
return err
}
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "")
if err != nil {
util.SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
}
err = s.db.SetPublished(v.ID())
if err != nil {
return err