diff --git a/ytsync/manager.go b/ytsync/manager.go index 4d5e6cf..9e72d90 100644 --- a/ytsync/manager.go +++ b/ytsync/manager.go @@ -90,13 +90,19 @@ func (s SyncManager) fetchChannels(status string) ([]apiYoutubeChannel, error) { return response.Data, nil } -type apiSyncUpdateResponse struct { - Success bool `json:"success"` - Error null.String `json:"error"` - Data null.String `json:"data"` +type apiChannelStatusResponse struct { + Success bool `json:"success"` + Error null.String `json:"error"` + Data []syncedVideo `json:"data"` } -func (s SyncManager) setChannelSyncStatus(channelID string, status string) error { +type syncedVideo struct { + VideoID string `json:"video_id"` + Published bool `json:"published"` + FailureReason string `json:"failure_reason"` +} + +func (s SyncManager) setChannelStatus(channelID string, status string) (map[string]syncedVideo, error) { endpoint := s.ApiURL + "/yt/channel_status" res, _ := http.PostForm(endpoint, url.Values{ @@ -107,18 +113,22 @@ func (s SyncManager) setChannelSyncStatus(channelID string, status string) error }) defer res.Body.Close() body, _ := ioutil.ReadAll(res.Body) - var response apiSyncUpdateResponse + var response apiChannelStatusResponse err := json.Unmarshal(body, &response) if err != nil { - return err + return nil, err } if !response.Error.IsNull() { - return errors.Err(response.Error.String) + return nil, errors.Err(response.Error.String) } - if !response.Data.IsNull() && response.Data.String == "ok" { - return nil + if response.Data != nil { + svs := make(map[string]syncedVideo) + for _, v := range response.Data { + svs[v.VideoID] = v + } + return svs, nil } - return errors.Err("invalid API response. Status code: %d", res.StatusCode) + return nil, errors.Err("invalid API response. Status code: %d", res.StatusCode) } const ( @@ -149,7 +159,11 @@ func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status st res, _ := http.PostForm(endpoint, vals) defer res.Body.Close() body, _ := ioutil.ReadAll(res.Body) - var response apiSyncUpdateResponse + var response struct { + Success bool `json:"success"` + Error null.String `json:"error"` + Data null.String `json:"data"` + } err := json.Unmarshal(body, &response) if err != nil { return err @@ -237,7 +251,7 @@ func (s SyncManager) Start() error { time.Sleep(5 * time.Minute) } for i, sync := range syncs { - SendInfoToSlack("Syncing %s to LBRY! (iteration %d/%d - total session iterations: %d)", sync.LbryChannelName, i, len(syncs), syncCount) + SendInfoToSlack("Syncing %s (%s) to LBRY! (iteration %d/%d - total session iterations: %d)", sync.LbryChannelName, sync.YoutubeChannelID, i+1, len(syncs), syncCount+1) err := sync.FullCycle() if err != nil { fatalErrors := []string{ @@ -251,7 +265,7 @@ func (s SyncManager) Start() error { } SendInfoToSlack("A non fatal error was reported by the sync process. %s\nContinuing...", err.Error()) } - SendInfoToSlack("Syncing %s reached an end. (Iteration %d/%d - total session iterations: %d))", sync.LbryChannelName, i, len(syncs), syncCount) + SendInfoToSlack("Syncing %s (%s) reached an end. (Iteration %d/%d - total session iterations: %d))", sync.LbryChannelName, sync.YoutubeChannelID, i+1, len(syncs), syncCount+1) syncCount++ if sync.IsInterrupted() || (s.Limit != 0 && syncCount >= s.Limit) { shouldInterruptLoop = true diff --git a/ytsync/setup.go b/ytsync/setup.go index 0046d8a..7a40ccd 100644 --- a/ytsync/setup.go +++ b/ytsync/setup.go @@ -14,8 +14,8 @@ import ( func (s *Sync) walletSetup() error { //prevent unnecessary concurrent execution - s.mux.Lock() - defer s.mux.Unlock() + s.walletMux.Lock() + defer s.walletMux.Unlock() err := s.ensureChannelOwnership() if err != nil { return err @@ -30,31 +30,32 @@ func (s *Sync) walletSetup() error { balance := decimal.Decimal(*balanceResp) log.Debugf("Starting balance is %s", balance.String()) - var numOnSource uint64 + var numOnSource int if s.LbryChannelName == "@UCBerkeley" { numOnSource = 10104 } else { - numOnSource, err = s.CountVideos() + n, err := s.CountVideos() if err != nil { return err } + numOnSource = int(n) } log.Debugf("Source channel has %d videos", numOnSource) if numOnSource == 0 { return nil } - numPublished, err := s.daemon.NumClaimsInChannel(s.LbryChannelName) - if err != nil { - return err - } + s.syncedVideosMux.Lock() + numPublished := len(s.syncedVideos) //should we only count published videos? Credits are allocated even for failed ones... + s.syncedVideosMux.Unlock() log.Debugf("We already published %d videos", numPublished) - if float64(numOnSource)-float64(numPublished) > float64(s.Manager.VideosLimit) { - numOnSource = uint64(s.Manager.VideosLimit) + if numOnSource-numPublished > s.Manager.VideosLimit { + numOnSource = s.Manager.VideosLimit } + minBalance := (float64(numOnSource)-float64(numPublished))*(publishAmount+0.1) + channelClaimAmount - if numPublished > numOnSource { + if numPublished > numOnSource && balance.LessThan(decimal.NewFromFloat(1)) { SendErrorToSlack("something is going on as we published more videos than those available on source: %d/%d", numPublished, numOnSource) minBalance = 1 //since we ended up in this function it means some juice is still needed } @@ -95,7 +96,6 @@ func (s *Sync) walletSetup() error { } func (s *Sync) ensureEnoughUTXOs() error { - utxolist, err := s.daemon.UTXOList() if err != nil { return err @@ -103,14 +103,6 @@ func (s *Sync) ensureEnoughUTXOs() error { return errors.Err("no response") } - if !allUTXOsConfirmed(utxolist) { - log.Println("Waiting for previous txns to confirm") // happens if you restarted the daemon soon after a previous publish run - err := s.waitUntilUTXOsConfirmed() - if err != nil { - return err - } - } - target := 40 count := 0 @@ -141,12 +133,13 @@ func (s *Sync) ensureEnoughUTXOs() error { return errors.Err("no response") } - wait := 15 * time.Second - log.Println("Waiting " + wait.String() + " for lbryum to let us know we have the new addresses") - time.Sleep(wait) - - log.Println("Creating UTXOs and waiting for them to be confirmed") - err = s.waitUntilUTXOsConfirmed() + err = s.waitForNewBlock() + if err != nil { + return err + } + } else if !allUTXOsConfirmed(utxolist) { + log.Println("Waiting for previous txns to confirm") + err := s.waitForNewBlock() if err != nil { return err } @@ -155,28 +148,31 @@ func (s *Sync) ensureEnoughUTXOs() error { return nil } -func (s *Sync) waitUntilUTXOsConfirmed() error { - origin := time.Now() - for { - r, err := s.daemon.UTXOList() +func (s *Sync) waitForNewBlock() error { + status, err := s.daemon.Status() + if err != nil { + return err + } + + for status.Wallet.Blocks == 0 || status.Wallet.BlocksBehind != 0 { + time.Sleep(5 * time.Second) + status, err = s.daemon.Status() if err != nil { return err - } else if r == nil { - return errors.Err("no response") } - - if allUTXOsConfirmed(r) { - return nil - } - if time.Now().After(origin.Add(15 * time.Minute)) { - //lbryum is messing with us or something. restart the daemon - //this could also be a very long block - SendErrorToSlack("We've been waiting UTXOs confirmation for %s... and this isn't normal", time.Now().Sub(origin).String()) - } - wait := 30 * time.Second - log.Println("Waiting " + wait.String() + "...") - time.Sleep(wait) } + currentBlock := status.Wallet.Blocks + for i := 0; status.Wallet.Blocks <= currentBlock; i++ { + if i%3 == 0 { + log.Printf("Waiting for new block (%d)...", currentBlock+1) + } + time.Sleep(10 * time.Second) + status, err = s.daemon.Status() + if err != nil { + return err + } + } + return nil } func (s *Sync) ensureChannelOwnership() error { @@ -194,6 +190,7 @@ func (s *Sync) ensureChannelOwnership() error { isChannelMine := false for _, channel := range *channels { if channel.Name == s.LbryChannelName { + s.lbryChannelID = channel.ClaimID isChannelMine = true } else { return errors.Err("this wallet has multiple channels. maybe something went wrong during setup?") @@ -232,16 +229,11 @@ func (s *Sync) ensureChannelOwnership() error { s.addCredits(channelBidAmount + 0.1) } - _, err = s.daemon.ChannelNew(s.LbryChannelName, channelBidAmount) + c, err := s.daemon.ChannelNew(s.LbryChannelName, channelBidAmount) if err != nil { return err } - - // niko's code says "unfortunately the queues in the daemon are not yet merged so we must give it some time for the channel to go through" - wait := 15 * time.Second - log.Println("Waiting " + wait.String() + " for channel claim to go through") - time.Sleep(wait) - + s.lbryChannelID = c.ClaimID return nil } diff --git a/ytsync/sources/ucbVideo.go b/ytsync/sources/ucbVideo.go index 514b676..f7ead6a 100644 --- a/ytsync/sources/ucbVideo.go +++ b/ytsync/sources/ucbVideo.go @@ -170,25 +170,23 @@ func (v ucbVideo) saveThumbnail() error { return err } -func (v ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) (*SyncSummary, error) { +func (v ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) { options := jsonrpc.PublishOptions{ - Title: &v.title, - Author: strPtr("UC Berkeley"), - Description: strPtr(v.getAbbrevDescription()), - Language: strPtr("en"), - ClaimAddress: &claimAddress, - Thumbnail: strPtr("https://berk.ninja/thumbnails/" + v.id), - License: strPtr("see description"), - } - - if channelName != "" { - options.ChannelName = &channelName + Title: &v.title, + Author: strPtr("UC Berkeley"), + Description: strPtr(v.getAbbrevDescription()), + Language: strPtr("en"), + ClaimAddress: &claimAddress, + Thumbnail: strPtr("https://berk.ninja/thumbnails/" + v.id), + License: strPtr("see description"), + ChannelID: &channelID, + ChangeAddress: &claimAddress, } return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options) } -func (v ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string, maxVideoSize int) (*SyncSummary, error) { +func (v ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) { //download and thumbnail can be done in parallel err := v.download() if err != nil { @@ -202,7 +200,7 @@ func (v ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float //} //log.Debugln("Created thumbnail for " + v.id) - summary, err := v.publish(daemon, claimAddress, amount, channelName) + summary, err := v.publish(daemon, claimAddress, amount, channelID) if err != nil { return nil, errors.Prefix("publish error", err) } diff --git a/ytsync/sources/youtubeVideo.go b/ytsync/sources/youtubeVideo.go index 0f24419..9fb091e 100644 --- a/ytsync/sources/youtubeVideo.go +++ b/ytsync/sources/youtubeVideo.go @@ -66,7 +66,7 @@ func (v YoutubeVideo) getFilename() string { name := chunks[0] if len(name) > maxLen { - return name[:maxLen] + name = name[:maxLen] } for _, chunk := range chunks[1:] { @@ -185,7 +185,10 @@ func (v YoutubeVideo) triggerThumbnailSave() error { func strPtr(s string) *string { return &s } -func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) (*SyncSummary, error) { +func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) { + if channelID == "" { + return nil, errors.Err("a claim_id for the channel wasn't provided") //TODO: this is probably not needed? + } options := jsonrpc.PublishOptions{ Title: &v.title, Author: &v.channelTitle, @@ -195,15 +198,12 @@ func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amoun Thumbnail: strPtr("https://berk.ninja/thumbnails/" + v.id), License: strPtr("Copyrighted (contact author)"), ChangeAddress: &claimAddress, + ChannelID: &channelID, } - if channelName != "" { - options.ChannelName = &channelName - } - return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options) } -func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string, maxVideoSize int) (*SyncSummary, error) { +func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) { //download and thumbnail can be done in parallel err := v.download() if err != nil { @@ -227,7 +227,7 @@ func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount f } log.Debugln("Created thumbnail for " + v.id) - summary, err := v.publish(daemon, claimAddress, amount, channelName) + summary, err := v.publish(daemon, claimAddress, amount, channelID) //delete the video in all cases (and ignore the error) _ = v.delete() if err != nil { diff --git a/ytsync/ytsync.go b/ytsync/ytsync.go index 0ee0056..c44d9c6 100644 --- a/ytsync/ytsync.go +++ b/ytsync/ytsync.go @@ -61,16 +61,27 @@ type Sync struct { Refill int Manager *SyncManager - daemon *jsonrpc.Client - claimAddress string - videoDirectory string - db *redisdb.DB + daemon *jsonrpc.Client + claimAddress string + videoDirectory string + db *redisdb.DB + syncedVideos map[string]syncedVideo + syncedVideosMux *sync.Mutex + grp *stop.Group + lbryChannelID string - grp *stop.Group + walletMux *sync.Mutex + queue chan video +} - mux sync.Mutex - wg sync.WaitGroup - queue chan video +func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string) { + s.syncedVideosMux.Lock() + defer s.syncedVideosMux.Unlock() + s.syncedVideos[videoID] = syncedVideo{ + VideoID: videoID, + Published: published, + FailureReason: failureReason, + } } // SendErrorToSlack Sends an error message to the default channel and to the process log. @@ -110,10 +121,26 @@ func (s *Sync) FullCycle() (e error) { if s.YoutubeChannelID == "" { return errors.Err("channel ID not provided") } - err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSyncing) + s.syncedVideosMux = &sync.Mutex{} + s.walletMux = &sync.Mutex{} + s.db = redisdb.New() + s.grp = stop.New() + s.queue = make(chan video) + interruptChan := make(chan os.Signal, 1) + signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) + go func() { + <-interruptChan + log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)") + s.grp.Stop() + }() + syncedVideos, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing) if err != nil { return err } + s.syncedVideosMux.Lock() + s.syncedVideos = syncedVideos + s.syncedVideosMux.Unlock() + defer func() { if e != nil { //conditions for which a channel shouldn't be marked as failed @@ -123,14 +150,14 @@ func (s *Sync) FullCycle() (e error) { if util.SubstringInSlice(e.Error(), noFailConditions) { return } - err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusFailed) + _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed) if err != nil { msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName) err = errors.Prefix(msg, err) e = errors.Prefix(err.Error(), e) } } else if !s.IsInterrupted() { - err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSynced) + _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced) if err != nil { e = err } @@ -181,18 +208,6 @@ func (s *Sync) FullCycle() (e error) { return errors.Wrap(err, 0) } - s.db = redisdb.New() - s.grp = stop.New() - s.queue = make(chan video) - - interruptChan := make(chan os.Signal, 1) - signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) - go func() { - <-interruptChan - log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)") - s.grp.Stop() - }() - log.Printf("Starting daemon") err = startDaemonViaSystemd() if err != nil { @@ -247,7 +262,11 @@ func (s *Sync) doSync() error { } for i := 0; i < s.ConcurrentVideos; i++ { - go s.startWorker(i) + s.grp.Add(1) + go func() { + defer s.grp.Done() + s.startWorker(i) + }() } if s.LbryChannelName == "@UCBerkeley" { @@ -256,14 +275,11 @@ func (s *Sync) doSync() error { err = s.enqueueYoutubeVideos() } close(s.queue) - s.wg.Wait() + s.grp.Wait() return err } func (s *Sync) startWorker(workerNum int) { - s.wg.Add(1) - defer s.wg.Done() - var v video var more bool @@ -300,6 +316,7 @@ func (s *Sync) startWorker(workerNum int) { "no space left on device", "NotEnoughFunds", "Cannot publish using channel", + "more than 90% of the space has been used.", } if util.SubstringInSlice(err.Error(), fatalErrors) || s.StopOnError { s.grp.Stop() @@ -336,6 +353,7 @@ func (s *Sync) startWorker(workerNum int) { } SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg) } + s.AppendSyncedVideo(v.ID(), false, err.Error()) err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error()) if err != nil { SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) @@ -493,10 +511,32 @@ func (s *Sync) processVideo(v video) (err error) { log.Println(v.ID() + " took " + time.Since(start).String()) }(time.Now()) - alreadyPublished, err := s.db.IsPublished(v.ID()) + s.syncedVideosMux.Lock() + sv, ok := s.syncedVideos[v.ID()] + s.syncedVideosMux.Unlock() + alreadyPublished := ok && sv.Published + + neverRetryFailures := []string{ + "Error extracting sts from embedded url response", + "the video is too big to sync, skipping for now", + } + if ok && !sv.Published && util.SubstringInSlice(sv.FailureReason, neverRetryFailures) { + log.Println(v.ID() + " can't ever be published") + return nil + } + + //TODO: remove this after a few runs... + alreadyPublishedOld, err := s.db.IsPublished(v.ID()) if err != nil { return err } + //TODO: remove this after a few runs... + if alreadyPublishedOld && !alreadyPublished { + //seems like something in the migration of blobs didn't go perfectly right so warn about it! + SendInfoToSlack("A video that was previously published is on the local database but isn't on the remote db! fix it @Nikooo777! \nchannelID: %s, videoID: %s", + s.YoutubeChannelID, v.ID()) + return nil + } if alreadyPublished { log.Println(v.ID() + " already published") @@ -507,18 +547,19 @@ func (s *Sync) processVideo(v video) (err error) { log.Println(v.ID() + " is old: skipping") return nil } - summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.LbryChannelName, s.Manager.MaxVideoSize) + err = s.Manager.checkUsedSpace() + if err != nil { + return err + } + summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.MaxVideoSize) if err != nil { return err } err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "") - if err != nil { - SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) - } - err = s.db.SetPublished(v.ID()) if err != nil { return err } + s.AppendSyncedVideo(v.ID(), true, "") return nil }