From c428b5ef77ba04d5c1f1086952bf5f4042cf63a6 Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Mon, 30 Jul 2018 19:19:12 -0400 Subject: [PATCH 01/12] remove redis dependency use video statuses --- ytsync/manager.go | 31 +++++++++++++++++++++++++------ ytsync/setup.go | 8 +++++++- ytsync/ytsync.go | 28 ++++++++++++++++------------ 3 files changed, 48 insertions(+), 19 deletions(-) diff --git a/ytsync/manager.go b/ytsync/manager.go index 4d5e6cf..fd30074 100644 --- a/ytsync/manager.go +++ b/ytsync/manager.go @@ -96,7 +96,13 @@ type apiSyncUpdateResponse struct { Data null.String `json:"data"` } -func (s SyncManager) setChannelSyncStatus(channelID string, status string) error { +type syncedVideo struct { + VideoID string + Published bool + FailureReason string +} + +func (s SyncManager) setChannelSyncStatus(channelID string, status string) (map[string]syncedVideo, error) { endpoint := s.ApiURL + "/yt/channel_status" res, _ := http.PostForm(endpoint, url.Values{ @@ -110,15 +116,28 @@ func (s SyncManager) setChannelSyncStatus(channelID string, status string) error var response apiSyncUpdateResponse err := json.Unmarshal(body, &response) if err != nil { - return err + return nil, err } if !response.Error.IsNull() { - return errors.Err(response.Error.String) + return nil, errors.Err(response.Error.String) } - if !response.Data.IsNull() && response.Data.String == "ok" { - return nil + if !response.Data.IsNull() { + if response.Data.String == "ok" { + return nil, nil + } + var sv []syncedVideo + err := json.Unmarshal([]byte(response.Data.String), &sv) + if err != nil { + return nil, errors.Err("could not parse synced videos") + } + + svs := make(map[string]syncedVideo) + for _, v := range sv { + svs[v.VideoID] = v + } + return svs, nil } - return errors.Err("invalid API response. Status code: %d", res.StatusCode) + return nil, errors.Err("invalid API response. Status code: %d", res.StatusCode) } const ( diff --git a/ytsync/setup.go b/ytsync/setup.go index 0046d8a..f8cc8fe 100644 --- a/ytsync/setup.go +++ b/ytsync/setup.go @@ -44,7 +44,8 @@ func (s *Sync) walletSetup() error { return nil } - numPublished, err := s.daemon.NumClaimsInChannel(s.LbryChannelName) + //numPublished, err := s.daemon.NumClaimsInChannel(s.LbryChannelName) + numPublished := uint64(len(s.syncedVideos)) //should we only count published videos? Credits are allocated even for failed ones... if err != nil { return err } @@ -53,6 +54,11 @@ func (s *Sync) walletSetup() error { if float64(numOnSource)-float64(numPublished) > float64(s.Manager.VideosLimit) { numOnSource = uint64(s.Manager.VideosLimit) } + + //TODO: get rid of this as soon as we compute this condition using the database in a more reliable way + if numPublished >= numOnSource { + return errors.Err("channel is already up to date") + } minBalance := (float64(numOnSource)-float64(numPublished))*(publishAmount+0.1) + channelClaimAmount if numPublished > numOnSource { SendErrorToSlack("something is going on as we published more videos than those available on source: %d/%d", numPublished, numOnSource) diff --git a/ytsync/ytsync.go b/ytsync/ytsync.go index 0ee0056..362d115 100644 --- a/ytsync/ytsync.go +++ b/ytsync/ytsync.go @@ -21,7 +21,6 @@ import ( "github.com/lbryio/lbry.go/jsonrpc" "github.com/lbryio/lbry.go/stop" "github.com/lbryio/lbry.go/util" - "github.com/lbryio/lbry.go/ytsync/redisdb" "github.com/lbryio/lbry.go/ytsync/sources" "github.com/mitchellh/go-ps" log "github.com/sirupsen/logrus" @@ -64,9 +63,9 @@ type Sync struct { daemon *jsonrpc.Client claimAddress string videoDirectory string - db *redisdb.DB - - grp *stop.Group + //db *redisdb.DB + syncedVideos map[string]syncedVideo + grp *stop.Group mux sync.Mutex wg sync.WaitGroup @@ -110,7 +109,7 @@ func (s *Sync) FullCycle() (e error) { if s.YoutubeChannelID == "" { return errors.Err("channel ID not provided") } - err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSyncing) + syncedVideos, err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSyncing) if err != nil { return err } @@ -119,18 +118,19 @@ func (s *Sync) FullCycle() (e error) { //conditions for which a channel shouldn't be marked as failed noFailConditions := []string{ "this youtube channel is being managed by another server", + "channel is already up to date", } if util.SubstringInSlice(e.Error(), noFailConditions) { return } - err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusFailed) + _, err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusFailed) if err != nil { msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName) err = errors.Prefix(msg, err) e = errors.Prefix(err.Error(), e) } } else if !s.IsInterrupted() { - err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSynced) + _, err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSynced) if err != nil { e = err } @@ -181,7 +181,8 @@ func (s *Sync) FullCycle() (e error) { return errors.Wrap(err, 0) } - s.db = redisdb.New() + //s.db = redisdb.New() + s.syncedVideos = syncedVideos s.grp = stop.New() s.queue = make(chan video) @@ -493,9 +494,12 @@ func (s *Sync) processVideo(v video) (err error) { log.Println(v.ID() + " took " + time.Since(start).String()) }(time.Now()) - alreadyPublished, err := s.db.IsPublished(v.ID()) - if err != nil { - return err + sv, ok := s.syncedVideos[v.ID()] + alreadyPublished := ok && sv.Published + + if ok && !sv.Published && strings.Contains(sv.FailureReason, "Error extracting sts from embedded url response") { + log.Println(v.ID() + " can't be published") + return nil } if alreadyPublished { @@ -515,7 +519,7 @@ func (s *Sync) processVideo(v video) (err error) { if err != nil { SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) } - err = s.db.SetPublished(v.ID()) + //err = s.db.SetPublished(v.ID()) if err != nil { return err } From b3f7f93f5fa5a836a9b779ba5d128fae02ce172e Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Tue, 31 Jul 2018 13:42:20 -0400 Subject: [PATCH 02/12] refactor and fix api response handlers reinstate redis db to detect inconsistencies --- ytsync/manager.go | 38 +++++++++++++++++++------------------- ytsync/ytsync.go | 28 +++++++++++++++++++++------- 2 files changed, 40 insertions(+), 26 deletions(-) diff --git a/ytsync/manager.go b/ytsync/manager.go index fd30074..24017fa 100644 --- a/ytsync/manager.go +++ b/ytsync/manager.go @@ -90,19 +90,19 @@ func (s SyncManager) fetchChannels(status string) ([]apiYoutubeChannel, error) { return response.Data, nil } -type apiSyncUpdateResponse struct { - Success bool `json:"success"` - Error null.String `json:"error"` - Data null.String `json:"data"` +type apiChannelStatusResponse struct { + Success bool `json:"success"` + Error null.String `json:"error"` + Data []syncedVideo `json:"data"` } type syncedVideo struct { - VideoID string - Published bool - FailureReason string + VideoID string `json:"video_id"` + Published bool `json:"published"` + FailureReason string `json:"failure_reason"` } -func (s SyncManager) setChannelSyncStatus(channelID string, status string) (map[string]syncedVideo, error) { +func (s SyncManager) setChannelStatus(channelID string, status string) (map[string]syncedVideo, error) { endpoint := s.ApiURL + "/yt/channel_status" res, _ := http.PostForm(endpoint, url.Values{ @@ -113,7 +113,7 @@ func (s SyncManager) setChannelSyncStatus(channelID string, status string) (map[ }) defer res.Body.Close() body, _ := ioutil.ReadAll(res.Body) - var response apiSyncUpdateResponse + var response apiChannelStatusResponse err := json.Unmarshal(body, &response) if err != nil { return nil, err @@ -121,18 +121,12 @@ func (s SyncManager) setChannelSyncStatus(channelID string, status string) (map[ if !response.Error.IsNull() { return nil, errors.Err(response.Error.String) } - if !response.Data.IsNull() { - if response.Data.String == "ok" { + if response.Data != nil { + if len(response.Data) == 0 { return nil, nil } - var sv []syncedVideo - err := json.Unmarshal([]byte(response.Data.String), &sv) - if err != nil { - return nil, errors.Err("could not parse synced videos") - } - svs := make(map[string]syncedVideo) - for _, v := range sv { + for _, v := range response.Data { svs[v.VideoID] = v } return svs, nil @@ -145,6 +139,12 @@ const ( VideoStatusFailed = "failed" ) +type apiVideoStatusResponse struct { + Success bool `json:"success"` + Error null.String `json:"error"` + Data null.String `json:"data"` +} + func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string) error { endpoint := s.ApiURL + "/yt/video_status" @@ -168,7 +168,7 @@ func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status st res, _ := http.PostForm(endpoint, vals) defer res.Body.Close() body, _ := ioutil.ReadAll(res.Body) - var response apiSyncUpdateResponse + var response apiVideoStatusResponse err := json.Unmarshal(body, &response) if err != nil { return err diff --git a/ytsync/ytsync.go b/ytsync/ytsync.go index 362d115..b1a3ce2 100644 --- a/ytsync/ytsync.go +++ b/ytsync/ytsync.go @@ -21,6 +21,7 @@ import ( "github.com/lbryio/lbry.go/jsonrpc" "github.com/lbryio/lbry.go/stop" "github.com/lbryio/lbry.go/util" + "github.com/lbryio/lbry.go/ytsync/redisdb" "github.com/lbryio/lbry.go/ytsync/sources" "github.com/mitchellh/go-ps" log "github.com/sirupsen/logrus" @@ -63,9 +64,9 @@ type Sync struct { daemon *jsonrpc.Client claimAddress string videoDirectory string - //db *redisdb.DB - syncedVideos map[string]syncedVideo - grp *stop.Group + db *redisdb.DB + syncedVideos map[string]syncedVideo + grp *stop.Group mux sync.Mutex wg sync.WaitGroup @@ -109,10 +110,11 @@ func (s *Sync) FullCycle() (e error) { if s.YoutubeChannelID == "" { return errors.Err("channel ID not provided") } - syncedVideos, err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSyncing) + syncedVideos, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing) if err != nil { return err } + defer func() { if e != nil { //conditions for which a channel shouldn't be marked as failed @@ -123,14 +125,14 @@ func (s *Sync) FullCycle() (e error) { if util.SubstringInSlice(e.Error(), noFailConditions) { return } - _, err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusFailed) + _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed) if err != nil { msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName) err = errors.Prefix(msg, err) e = errors.Prefix(err.Error(), e) } } else if !s.IsInterrupted() { - _, err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSynced) + _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced) if err != nil { e = err } @@ -181,7 +183,7 @@ func (s *Sync) FullCycle() (e error) { return errors.Wrap(err, 0) } - //s.db = redisdb.New() + s.db = redisdb.New() s.syncedVideos = syncedVideos s.grp = stop.New() s.queue = make(chan video) @@ -502,6 +504,18 @@ func (s *Sync) processVideo(v video) (err error) { return nil } + alreadyPublishedOld, err := s.db.IsPublished(v.ID()) + if err != nil { + return err + } + //TODO: remove this after a few runs + if alreadyPublishedOld && !alreadyPublished { + //seems like something in the migration of blobs didn't go perfectly right so warn about it! + SendInfoToSlack("A video that was previously published is on the local database but isn't on the remote db! fix it @Nikooo777! \nchannelID: %s, videoID: %s", + s.YoutubeChannelID, v.ID()) + return nil + } + if alreadyPublished { log.Println(v.ID() + " already published") return nil From 630ca87f9d1d31bc94047661ac55dc1516de0131 Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Thu, 2 Aug 2018 08:05:06 -0400 Subject: [PATCH 03/12] fix oversights --- ytsync/setup.go | 4 ---- ytsync/ytsync.go | 12 ++++++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/ytsync/setup.go b/ytsync/setup.go index f8cc8fe..042895d 100644 --- a/ytsync/setup.go +++ b/ytsync/setup.go @@ -44,11 +44,7 @@ func (s *Sync) walletSetup() error { return nil } - //numPublished, err := s.daemon.NumClaimsInChannel(s.LbryChannelName) numPublished := uint64(len(s.syncedVideos)) //should we only count published videos? Credits are allocated even for failed ones... - if err != nil { - return err - } log.Debugf("We already published %d videos", numPublished) if float64(numOnSource)-float64(numPublished) > float64(s.Manager.VideosLimit) { diff --git a/ytsync/ytsync.go b/ytsync/ytsync.go index b1a3ce2..969a658 100644 --- a/ytsync/ytsync.go +++ b/ytsync/ytsync.go @@ -499,16 +499,21 @@ func (s *Sync) processVideo(v video) (err error) { sv, ok := s.syncedVideos[v.ID()] alreadyPublished := ok && sv.Published - if ok && !sv.Published && strings.Contains(sv.FailureReason, "Error extracting sts from embedded url response") { - log.Println(v.ID() + " can't be published") + neverRetryFailures := []string{ + "Error extracting sts from embedded url response", + "the video is too big to sync, skipping for now", + } + if ok && !sv.Published && util.SubstringInSlice(sv.FailureReason, neverRetryFailures) { + log.Println(v.ID() + " can't ever be published") return nil } + //TODO: remove this after a few runs... alreadyPublishedOld, err := s.db.IsPublished(v.ID()) if err != nil { return err } - //TODO: remove this after a few runs + //TODO: remove this after a few runs... if alreadyPublishedOld && !alreadyPublished { //seems like something in the migration of blobs didn't go perfectly right so warn about it! SendInfoToSlack("A video that was previously published is on the local database but isn't on the remote db! fix it @Nikooo777! \nchannelID: %s, videoID: %s", @@ -533,7 +538,6 @@ func (s *Sync) processVideo(v video) (err error) { if err != nil { SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) } - //err = s.db.SetPublished(v.ID()) if err != nil { return err } From 3956e0dc94c5f8e56ce8fe2f9aaf03a69e312848 Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Fri, 3 Aug 2018 13:21:42 -0400 Subject: [PATCH 04/12] update synced videos map on progress add locking fix bugs --- ytsync/manager.go | 2 +- ytsync/setup.go | 2 ++ ytsync/ytsync.go | 26 ++++++++++++++++++++------ 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/ytsync/manager.go b/ytsync/manager.go index 24017fa..35381b8 100644 --- a/ytsync/manager.go +++ b/ytsync/manager.go @@ -270,7 +270,7 @@ func (s SyncManager) Start() error { } SendInfoToSlack("A non fatal error was reported by the sync process. %s\nContinuing...", err.Error()) } - SendInfoToSlack("Syncing %s reached an end. (Iteration %d/%d - total session iterations: %d))", sync.LbryChannelName, i, len(syncs), syncCount) + SendInfoToSlack("Syncing %s reached an end. (Iteration %d/%d - total session iterations: %d))", sync.LbryChannelName, i+1, len(syncs), syncCount) syncCount++ if sync.IsInterrupted() || (s.Limit != 0 && syncCount >= s.Limit) { shouldInterruptLoop = true diff --git a/ytsync/setup.go b/ytsync/setup.go index 042895d..f8c6441 100644 --- a/ytsync/setup.go +++ b/ytsync/setup.go @@ -44,7 +44,9 @@ func (s *Sync) walletSetup() error { return nil } + s.videosMapMux.Lock() numPublished := uint64(len(s.syncedVideos)) //should we only count published videos? Credits are allocated even for failed ones... + s.videosMapMux.Unlock() log.Debugf("We already published %d videos", numPublished) if float64(numOnSource)-float64(numPublished) > float64(s.Manager.VideosLimit) { diff --git a/ytsync/ytsync.go b/ytsync/ytsync.go index 969a658..5deb71e 100644 --- a/ytsync/ytsync.go +++ b/ytsync/ytsync.go @@ -68,9 +68,20 @@ type Sync struct { syncedVideos map[string]syncedVideo grp *stop.Group - mux sync.Mutex - wg sync.WaitGroup - queue chan video + videosMapMux sync.Mutex + mux sync.Mutex + wg sync.WaitGroup + queue chan video +} + +func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string) { + s.videosMapMux.Lock() + defer s.videosMapMux.Unlock() + s.syncedVideos[videoID] = syncedVideo{ + VideoID: videoID, + Published: published, + FailureReason: failureReason, + } } // SendErrorToSlack Sends an error message to the default channel and to the process log. @@ -184,7 +195,9 @@ func (s *Sync) FullCycle() (e error) { } s.db = redisdb.New() + s.videosMapMux.Lock() s.syncedVideos = syncedVideos + s.videosMapMux.Unlock() s.grp = stop.New() s.queue = make(chan video) @@ -343,6 +356,7 @@ func (s *Sync) startWorker(workerNum int) { if err != nil { SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) } + s.AppendSyncedVideo(v.ID(), false, err.Error()) } break } @@ -496,7 +510,9 @@ func (s *Sync) processVideo(v video) (err error) { log.Println(v.ID() + " took " + time.Since(start).String()) }(time.Now()) + s.videosMapMux.Lock() sv, ok := s.syncedVideos[v.ID()] + s.videosMapMux.Unlock() alreadyPublished := ok && sv.Published neverRetryFailures := []string{ @@ -535,12 +551,10 @@ func (s *Sync) processVideo(v video) (err error) { return err } err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "") - if err != nil { - SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) - } if err != nil { return err } + s.AppendSyncedVideo(v.ID(), true, "") return nil } From 9cfda0df64fc0e12df9e3de92b91f4665285e9f1 Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Fri, 3 Aug 2018 13:50:17 -0400 Subject: [PATCH 05/12] change utxos confirmation behavior should make startup faster bug fixes --- ytsync/manager.go | 2 +- ytsync/setup.go | 16 +++++++--------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/ytsync/manager.go b/ytsync/manager.go index 35381b8..efd67fc 100644 --- a/ytsync/manager.go +++ b/ytsync/manager.go @@ -256,7 +256,7 @@ func (s SyncManager) Start() error { time.Sleep(5 * time.Minute) } for i, sync := range syncs { - SendInfoToSlack("Syncing %s to LBRY! (iteration %d/%d - total session iterations: %d)", sync.LbryChannelName, i, len(syncs), syncCount) + SendInfoToSlack("Syncing %s to LBRY! (iteration %d/%d - total session iterations: %d)", sync.LbryChannelName, i+1, len(syncs), syncCount) err := sync.FullCycle() if err != nil { fatalErrors := []string{ diff --git a/ytsync/setup.go b/ytsync/setup.go index f8c6441..def90cd 100644 --- a/ytsync/setup.go +++ b/ytsync/setup.go @@ -55,7 +55,7 @@ func (s *Sync) walletSetup() error { //TODO: get rid of this as soon as we compute this condition using the database in a more reliable way if numPublished >= numOnSource { - return errors.Err("channel is already up to date") + return nil } minBalance := (float64(numOnSource)-float64(numPublished))*(publishAmount+0.1) + channelClaimAmount if numPublished > numOnSource { @@ -107,14 +107,6 @@ func (s *Sync) ensureEnoughUTXOs() error { return errors.Err("no response") } - if !allUTXOsConfirmed(utxolist) { - log.Println("Waiting for previous txns to confirm") // happens if you restarted the daemon soon after a previous publish run - err := s.waitUntilUTXOsConfirmed() - if err != nil { - return err - } - } - target := 40 count := 0 @@ -154,6 +146,12 @@ func (s *Sync) ensureEnoughUTXOs() error { if err != nil { return err } + } else if !allUTXOsConfirmed(utxolist) { + log.Println("Waiting for previous txns to confirm") + err := s.waitUntilUTXOsConfirmed() + if err != nil { + return err + } } return nil From 3a51ea9b76155178d002010c8f765f25c4afc339 Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Fri, 3 Aug 2018 17:19:36 -0400 Subject: [PATCH 06/12] add function to wait for a new block bad bug fixes remove half broken code used to wait for UTXOs use channel IDs rather than names (skip lbryum) port changes to UCB uploader --- ytsync/manager.go | 7 ++-- ytsync/setup.go | 59 +++++++++++++++------------------- ytsync/sources/ucbVideo.go | 26 +++++++-------- ytsync/sources/youtubeVideo.go | 14 ++++---- ytsync/ytsync.go | 5 +-- 5 files changed, 50 insertions(+), 61 deletions(-) diff --git a/ytsync/manager.go b/ytsync/manager.go index efd67fc..0c0e081 100644 --- a/ytsync/manager.go +++ b/ytsync/manager.go @@ -122,9 +122,6 @@ func (s SyncManager) setChannelStatus(channelID string, status string) (map[stri return nil, errors.Err(response.Error.String) } if response.Data != nil { - if len(response.Data) == 0 { - return nil, nil - } svs := make(map[string]syncedVideo) for _, v := range response.Data { svs[v.VideoID] = v @@ -256,7 +253,7 @@ func (s SyncManager) Start() error { time.Sleep(5 * time.Minute) } for i, sync := range syncs { - SendInfoToSlack("Syncing %s to LBRY! (iteration %d/%d - total session iterations: %d)", sync.LbryChannelName, i+1, len(syncs), syncCount) + SendInfoToSlack("Syncing %s (%s) to LBRY! (iteration %d/%d - total session iterations: %d)", sync.LbryChannelName, sync.YoutubeChannelID, i+1, len(syncs), syncCount) err := sync.FullCycle() if err != nil { fatalErrors := []string{ @@ -270,7 +267,7 @@ func (s SyncManager) Start() error { } SendInfoToSlack("A non fatal error was reported by the sync process. %s\nContinuing...", err.Error()) } - SendInfoToSlack("Syncing %s reached an end. (Iteration %d/%d - total session iterations: %d))", sync.LbryChannelName, i+1, len(syncs), syncCount) + SendInfoToSlack("Syncing %s (%s) reached an end. (Iteration %d/%d - total session iterations: %d))", sync.LbryChannelName, sync.YoutubeChannelID, i+1, len(syncs), syncCount) syncCount++ if sync.IsInterrupted() || (s.Limit != 0 && syncCount >= s.Limit) { shouldInterruptLoop = true diff --git a/ytsync/setup.go b/ytsync/setup.go index def90cd..d3e86ec 100644 --- a/ytsync/setup.go +++ b/ytsync/setup.go @@ -99,7 +99,6 @@ func (s *Sync) walletSetup() error { } func (s *Sync) ensureEnoughUTXOs() error { - utxolist, err := s.daemon.UTXOList() if err != nil { return err @@ -137,18 +136,13 @@ func (s *Sync) ensureEnoughUTXOs() error { return errors.Err("no response") } - wait := 15 * time.Second - log.Println("Waiting " + wait.String() + " for lbryum to let us know we have the new addresses") - time.Sleep(wait) - - log.Println("Creating UTXOs and waiting for them to be confirmed") - err = s.waitUntilUTXOsConfirmed() + err = s.waitForNewBlock() if err != nil { return err } } else if !allUTXOsConfirmed(utxolist) { log.Println("Waiting for previous txns to confirm") - err := s.waitUntilUTXOsConfirmed() + err := s.waitForNewBlock() if err != nil { return err } @@ -157,28 +151,31 @@ func (s *Sync) ensureEnoughUTXOs() error { return nil } -func (s *Sync) waitUntilUTXOsConfirmed() error { - origin := time.Now() - for { - r, err := s.daemon.UTXOList() +func (s *Sync) waitForNewBlock() error { + status, err := s.daemon.Status() + if err != nil { + return err + } + + for status.BlockchainStatus.Blocks == 0 || status.BlockchainStatus.BlocksBehind != 0 { + time.Sleep(5 * time.Second) + status, err = s.daemon.Status() if err != nil { return err - } else if r == nil { - return errors.Err("no response") } - - if allUTXOsConfirmed(r) { - return nil - } - if time.Now().After(origin.Add(15 * time.Minute)) { - //lbryum is messing with us or something. restart the daemon - //this could also be a very long block - SendErrorToSlack("We've been waiting UTXOs confirmation for %s... and this isn't normal", time.Now().Sub(origin).String()) - } - wait := 30 * time.Second - log.Println("Waiting " + wait.String() + "...") - time.Sleep(wait) } + currentBlock := status.BlockchainStatus.Blocks + for i := 0; status.BlockchainStatus.Blocks <= currentBlock; i++ { + if i%3 == 0 { + log.Printf("Waiting for new block (%d)...", currentBlock+1) + } + time.Sleep(10 * time.Second) + status, err = s.daemon.Status() + if err != nil { + return err + } + } + return nil } func (s *Sync) ensureChannelOwnership() error { @@ -196,6 +193,7 @@ func (s *Sync) ensureChannelOwnership() error { isChannelMine := false for _, channel := range *channels { if channel.Name == s.LbryChannelName { + s.lbryChannelID = channel.ClaimID isChannelMine = true } else { return errors.Err("this wallet has multiple channels. maybe something went wrong during setup?") @@ -234,16 +232,11 @@ func (s *Sync) ensureChannelOwnership() error { s.addCredits(channelBidAmount + 0.1) } - _, err = s.daemon.ChannelNew(s.LbryChannelName, channelBidAmount) + c, err := s.daemon.ChannelNew(s.LbryChannelName, channelBidAmount) if err != nil { return err } - - // niko's code says "unfortunately the queues in the daemon are not yet merged so we must give it some time for the channel to go through" - wait := 15 * time.Second - log.Println("Waiting " + wait.String() + " for channel claim to go through") - time.Sleep(wait) - + s.lbryChannelID = c.ClaimID return nil } diff --git a/ytsync/sources/ucbVideo.go b/ytsync/sources/ucbVideo.go index 514b676..f7ead6a 100644 --- a/ytsync/sources/ucbVideo.go +++ b/ytsync/sources/ucbVideo.go @@ -170,25 +170,23 @@ func (v ucbVideo) saveThumbnail() error { return err } -func (v ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) (*SyncSummary, error) { +func (v ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) { options := jsonrpc.PublishOptions{ - Title: &v.title, - Author: strPtr("UC Berkeley"), - Description: strPtr(v.getAbbrevDescription()), - Language: strPtr("en"), - ClaimAddress: &claimAddress, - Thumbnail: strPtr("https://berk.ninja/thumbnails/" + v.id), - License: strPtr("see description"), - } - - if channelName != "" { - options.ChannelName = &channelName + Title: &v.title, + Author: strPtr("UC Berkeley"), + Description: strPtr(v.getAbbrevDescription()), + Language: strPtr("en"), + ClaimAddress: &claimAddress, + Thumbnail: strPtr("https://berk.ninja/thumbnails/" + v.id), + License: strPtr("see description"), + ChannelID: &channelID, + ChangeAddress: &claimAddress, } return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options) } -func (v ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string, maxVideoSize int) (*SyncSummary, error) { +func (v ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) { //download and thumbnail can be done in parallel err := v.download() if err != nil { @@ -202,7 +200,7 @@ func (v ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float //} //log.Debugln("Created thumbnail for " + v.id) - summary, err := v.publish(daemon, claimAddress, amount, channelName) + summary, err := v.publish(daemon, claimAddress, amount, channelID) if err != nil { return nil, errors.Prefix("publish error", err) } diff --git a/ytsync/sources/youtubeVideo.go b/ytsync/sources/youtubeVideo.go index 0f24419..f712545 100644 --- a/ytsync/sources/youtubeVideo.go +++ b/ytsync/sources/youtubeVideo.go @@ -185,7 +185,10 @@ func (v YoutubeVideo) triggerThumbnailSave() error { func strPtr(s string) *string { return &s } -func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) (*SyncSummary, error) { +func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) { + if channelID == "" { + return nil, errors.Err("a claim_id for the channel wasn't provided") //TODO: this is probably not needed? + } options := jsonrpc.PublishOptions{ Title: &v.title, Author: &v.channelTitle, @@ -195,15 +198,12 @@ func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amoun Thumbnail: strPtr("https://berk.ninja/thumbnails/" + v.id), License: strPtr("Copyrighted (contact author)"), ChangeAddress: &claimAddress, + ChannelID: &channelID, } - if channelName != "" { - options.ChannelName = &channelName - } - return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options) } -func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string, maxVideoSize int) (*SyncSummary, error) { +func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) { //download and thumbnail can be done in parallel err := v.download() if err != nil { @@ -227,7 +227,7 @@ func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount f } log.Debugln("Created thumbnail for " + v.id) - summary, err := v.publish(daemon, claimAddress, amount, channelName) + summary, err := v.publish(daemon, claimAddress, amount, channelID) //delete the video in all cases (and ignore the error) _ = v.delete() if err != nil { diff --git a/ytsync/ytsync.go b/ytsync/ytsync.go index 5deb71e..6a32717 100644 --- a/ytsync/ytsync.go +++ b/ytsync/ytsync.go @@ -67,6 +67,7 @@ type Sync struct { db *redisdb.DB syncedVideos map[string]syncedVideo grp *stop.Group + lbryChannelID string videosMapMux sync.Mutex mux sync.Mutex @@ -352,11 +353,11 @@ func (s *Sync) startWorker(workerNum int) { } SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg) } + s.AppendSyncedVideo(v.ID(), false, err.Error()) err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error()) if err != nil { SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) } - s.AppendSyncedVideo(v.ID(), false, err.Error()) } break } @@ -546,7 +547,7 @@ func (s *Sync) processVideo(v video) (err error) { log.Println(v.ID() + " is old: skipping") return nil } - summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.LbryChannelName, s.Manager.MaxVideoSize) + summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.MaxVideoSize) if err != nil { return err } From 688ce0de1c6b75f9bd51a49acfdcd62258b7fcd5 Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Wed, 8 Aug 2018 05:55:27 -0400 Subject: [PATCH 07/12] fix mysterious bug --- ytsync/setup.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/ytsync/setup.go b/ytsync/setup.go index d3e86ec..c75f0fa 100644 --- a/ytsync/setup.go +++ b/ytsync/setup.go @@ -53,12 +53,8 @@ func (s *Sync) walletSetup() error { numOnSource = uint64(s.Manager.VideosLimit) } - //TODO: get rid of this as soon as we compute this condition using the database in a more reliable way - if numPublished >= numOnSource { - return nil - } minBalance := (float64(numOnSource)-float64(numPublished))*(publishAmount+0.1) + channelClaimAmount - if numPublished > numOnSource { + if numPublished > numOnSource && balance.LessThan(decimal.NewFromFloat(1)) { SendErrorToSlack("something is going on as we published more videos than those available on source: %d/%d", numPublished, numOnSource) minBalance = 1 //since we ended up in this function it means some juice is still needed } From 37c15954c0f39cd5801fe7b3ed6110d303628879 Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Wed, 8 Aug 2018 18:15:17 -0400 Subject: [PATCH 08/12] small bug fixes --- ytsync/manager.go | 4 ++-- ytsync/ytsync.go | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/ytsync/manager.go b/ytsync/manager.go index 0c0e081..654301f 100644 --- a/ytsync/manager.go +++ b/ytsync/manager.go @@ -253,7 +253,7 @@ func (s SyncManager) Start() error { time.Sleep(5 * time.Minute) } for i, sync := range syncs { - SendInfoToSlack("Syncing %s (%s) to LBRY! (iteration %d/%d - total session iterations: %d)", sync.LbryChannelName, sync.YoutubeChannelID, i+1, len(syncs), syncCount) + SendInfoToSlack("Syncing %s (%s) to LBRY! (iteration %d/%d - total session iterations: %d)", sync.LbryChannelName, sync.YoutubeChannelID, i+1, len(syncs), syncCount+1) err := sync.FullCycle() if err != nil { fatalErrors := []string{ @@ -267,7 +267,7 @@ func (s SyncManager) Start() error { } SendInfoToSlack("A non fatal error was reported by the sync process. %s\nContinuing...", err.Error()) } - SendInfoToSlack("Syncing %s (%s) reached an end. (Iteration %d/%d - total session iterations: %d))", sync.LbryChannelName, sync.YoutubeChannelID, i+1, len(syncs), syncCount) + SendInfoToSlack("Syncing %s (%s) reached an end. (Iteration %d/%d - total session iterations: %d))", sync.LbryChannelName, sync.YoutubeChannelID, i+1, len(syncs), syncCount+1) syncCount++ if sync.IsInterrupted() || (s.Limit != 0 && syncCount >= s.Limit) { shouldInterruptLoop = true diff --git a/ytsync/ytsync.go b/ytsync/ytsync.go index 6a32717..80463d0 100644 --- a/ytsync/ytsync.go +++ b/ytsync/ytsync.go @@ -132,7 +132,6 @@ func (s *Sync) FullCycle() (e error) { //conditions for which a channel shouldn't be marked as failed noFailConditions := []string{ "this youtube channel is being managed by another server", - "channel is already up to date", } if util.SubstringInSlice(e.Error(), noFailConditions) { return From 58c8e7f5a31b0aa2ca24d1268e456811f4971ca6 Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Thu, 9 Aug 2018 20:19:37 -0400 Subject: [PATCH 09/12] bug fixes --- ytsync/setup.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ytsync/setup.go b/ytsync/setup.go index c75f0fa..ccfbbb0 100644 --- a/ytsync/setup.go +++ b/ytsync/setup.go @@ -153,15 +153,15 @@ func (s *Sync) waitForNewBlock() error { return err } - for status.BlockchainStatus.Blocks == 0 || status.BlockchainStatus.BlocksBehind != 0 { + for status.Wallet.Blocks == 0 || status.Wallet.BlocksBehind != 0 { time.Sleep(5 * time.Second) status, err = s.daemon.Status() if err != nil { return err } } - currentBlock := status.BlockchainStatus.Blocks - for i := 0; status.BlockchainStatus.Blocks <= currentBlock; i++ { + currentBlock := status.Wallet.Blocks + for i := 0; status.Wallet.Blocks <= currentBlock; i++ { if i%3 == 0 { log.Printf("Waiting for new block (%d)...", currentBlock+1) } From 2cb9082b504df08cd670203e79280bb1c77d0271 Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Fri, 10 Aug 2018 08:41:21 -0400 Subject: [PATCH 10/12] fix nastiest bug ever --- ytsync/sources/youtubeVideo.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ytsync/sources/youtubeVideo.go b/ytsync/sources/youtubeVideo.go index f712545..9fb091e 100644 --- a/ytsync/sources/youtubeVideo.go +++ b/ytsync/sources/youtubeVideo.go @@ -66,7 +66,7 @@ func (v YoutubeVideo) getFilename() string { name := chunks[0] if len(name) > maxLen { - return name[:maxLen] + name = name[:maxLen] } for _, chunk := range chunks[1:] { From 38c1883dde9c8ee6c844771f89f5a7d2c871d49c Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Tue, 14 Aug 2018 10:48:55 -0400 Subject: [PATCH 11/12] use stop group instead of wait group refactoring address reviews --- ytsync/manager.go | 12 ++++---- ytsync/setup.go | 19 +++++++------ ytsync/ytsync.go | 70 +++++++++++++++++++++++------------------------ 3 files changed, 50 insertions(+), 51 deletions(-) diff --git a/ytsync/manager.go b/ytsync/manager.go index 654301f..9e72d90 100644 --- a/ytsync/manager.go +++ b/ytsync/manager.go @@ -136,12 +136,6 @@ const ( VideoStatusFailed = "failed" ) -type apiVideoStatusResponse struct { - Success bool `json:"success"` - Error null.String `json:"error"` - Data null.String `json:"data"` -} - func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string) error { endpoint := s.ApiURL + "/yt/video_status" @@ -165,7 +159,11 @@ func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status st res, _ := http.PostForm(endpoint, vals) defer res.Body.Close() body, _ := ioutil.ReadAll(res.Body) - var response apiVideoStatusResponse + var response struct { + Success bool `json:"success"` + Error null.String `json:"error"` + Data null.String `json:"data"` + } err := json.Unmarshal(body, &response) if err != nil { return err diff --git a/ytsync/setup.go b/ytsync/setup.go index ccfbbb0..7a40ccd 100644 --- a/ytsync/setup.go +++ b/ytsync/setup.go @@ -14,8 +14,8 @@ import ( func (s *Sync) walletSetup() error { //prevent unnecessary concurrent execution - s.mux.Lock() - defer s.mux.Unlock() + s.walletMux.Lock() + defer s.walletMux.Unlock() err := s.ensureChannelOwnership() if err != nil { return err @@ -30,27 +30,28 @@ func (s *Sync) walletSetup() error { balance := decimal.Decimal(*balanceResp) log.Debugf("Starting balance is %s", balance.String()) - var numOnSource uint64 + var numOnSource int if s.LbryChannelName == "@UCBerkeley" { numOnSource = 10104 } else { - numOnSource, err = s.CountVideos() + n, err := s.CountVideos() if err != nil { return err } + numOnSource = int(n) } log.Debugf("Source channel has %d videos", numOnSource) if numOnSource == 0 { return nil } - s.videosMapMux.Lock() - numPublished := uint64(len(s.syncedVideos)) //should we only count published videos? Credits are allocated even for failed ones... - s.videosMapMux.Unlock() + s.syncedVideosMux.Lock() + numPublished := len(s.syncedVideos) //should we only count published videos? Credits are allocated even for failed ones... + s.syncedVideosMux.Unlock() log.Debugf("We already published %d videos", numPublished) - if float64(numOnSource)-float64(numPublished) > float64(s.Manager.VideosLimit) { - numOnSource = uint64(s.Manager.VideosLimit) + if numOnSource-numPublished > s.Manager.VideosLimit { + numOnSource = s.Manager.VideosLimit } minBalance := (float64(numOnSource)-float64(numPublished))*(publishAmount+0.1) + channelClaimAmount diff --git a/ytsync/ytsync.go b/ytsync/ytsync.go index 80463d0..7e96371 100644 --- a/ytsync/ytsync.go +++ b/ytsync/ytsync.go @@ -61,23 +61,22 @@ type Sync struct { Refill int Manager *SyncManager - daemon *jsonrpc.Client - claimAddress string - videoDirectory string - db *redisdb.DB - syncedVideos map[string]syncedVideo - grp *stop.Group - lbryChannelID string + daemon *jsonrpc.Client + claimAddress string + videoDirectory string + db *redisdb.DB + syncedVideos map[string]syncedVideo + syncedVideosMux *sync.Mutex + grp *stop.Group + lbryChannelID string - videosMapMux sync.Mutex - mux sync.Mutex - wg sync.WaitGroup - queue chan video + walletMux *sync.Mutex + queue chan video } func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string) { - s.videosMapMux.Lock() - defer s.videosMapMux.Unlock() + s.syncedVideosMux.Lock() + defer s.syncedVideosMux.Unlock() s.syncedVideos[videoID] = syncedVideo{ VideoID: videoID, Published: published, @@ -122,10 +121,25 @@ func (s *Sync) FullCycle() (e error) { if s.YoutubeChannelID == "" { return errors.Err("channel ID not provided") } + s.syncedVideosMux = &sync.Mutex{} + s.walletMux = &sync.Mutex{} + s.db = redisdb.New() + s.grp = stop.New() + s.queue = make(chan video) + interruptChan := make(chan os.Signal, 1) + signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) + go func() { + <-interruptChan + log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)") + s.grp.Stop() + }() syncedVideos, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing) if err != nil { return err } + s.syncedVideosMux.Lock() + s.syncedVideos = syncedVideos + s.syncedVideosMux.Unlock() defer func() { if e != nil { @@ -194,21 +208,6 @@ func (s *Sync) FullCycle() (e error) { return errors.Wrap(err, 0) } - s.db = redisdb.New() - s.videosMapMux.Lock() - s.syncedVideos = syncedVideos - s.videosMapMux.Unlock() - s.grp = stop.New() - s.queue = make(chan video) - - interruptChan := make(chan os.Signal, 1) - signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) - go func() { - <-interruptChan - log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)") - s.grp.Stop() - }() - log.Printf("Starting daemon") err = startDaemonViaSystemd() if err != nil { @@ -263,7 +262,11 @@ func (s *Sync) doSync() error { } for i := 0; i < s.ConcurrentVideos; i++ { - go s.startWorker(i) + s.grp.Add(1) + go func() { + defer s.grp.Done() + s.startWorker(i) + }() } if s.LbryChannelName == "@UCBerkeley" { @@ -272,14 +275,11 @@ func (s *Sync) doSync() error { err = s.enqueueYoutubeVideos() } close(s.queue) - s.wg.Wait() + s.grp.Wait() return err } func (s *Sync) startWorker(workerNum int) { - s.wg.Add(1) - defer s.wg.Done() - var v video var more bool @@ -510,9 +510,9 @@ func (s *Sync) processVideo(v video) (err error) { log.Println(v.ID() + " took " + time.Since(start).String()) }(time.Now()) - s.videosMapMux.Lock() + s.syncedVideosMux.Lock() sv, ok := s.syncedVideos[v.ID()] - s.videosMapMux.Unlock() + s.syncedVideosMux.Unlock() alreadyPublished := ok && sv.Published neverRetryFailures := []string{ From 86adadb8a190042313590a5c25ad7c1a119495c0 Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Fri, 17 Aug 2018 14:05:39 -0400 Subject: [PATCH 12/12] don't publish when running out of space --- ytsync/ytsync.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/ytsync/ytsync.go b/ytsync/ytsync.go index 7e96371..c44d9c6 100644 --- a/ytsync/ytsync.go +++ b/ytsync/ytsync.go @@ -316,6 +316,7 @@ func (s *Sync) startWorker(workerNum int) { "no space left on device", "NotEnoughFunds", "Cannot publish using channel", + "more than 90% of the space has been used.", } if util.SubstringInSlice(err.Error(), fatalErrors) || s.StopOnError { s.grp.Stop() @@ -546,6 +547,10 @@ func (s *Sync) processVideo(v video) (err error) { log.Println(v.ID() + " is old: skipping") return nil } + err = s.Manager.checkUsedSpace() + if err != nil { + return err + } summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.MaxVideoSize) if err != nil { return err