Merge pull request #25 from lbryio/use_video_statuses

remove redisDB dependency
This commit is contained in:
Niko 2018-08-20 13:56:25 +02:00 committed by GitHub
commit 1d31c57786
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 166 additions and 121 deletions

View file

@ -90,13 +90,19 @@ func (s SyncManager) fetchChannels(status string) ([]apiYoutubeChannel, error) {
return response.Data, nil
}
type apiSyncUpdateResponse struct {
Success bool `json:"success"`
Error null.String `json:"error"`
Data null.String `json:"data"`
type apiChannelStatusResponse struct {
Success bool `json:"success"`
Error null.String `json:"error"`
Data []syncedVideo `json:"data"`
}
func (s SyncManager) setChannelSyncStatus(channelID string, status string) error {
type syncedVideo struct {
VideoID string `json:"video_id"`
Published bool `json:"published"`
FailureReason string `json:"failure_reason"`
}
func (s SyncManager) setChannelStatus(channelID string, status string) (map[string]syncedVideo, error) {
endpoint := s.ApiURL + "/yt/channel_status"
res, _ := http.PostForm(endpoint, url.Values{
@ -107,18 +113,22 @@ func (s SyncManager) setChannelSyncStatus(channelID string, status string) error
})
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
var response apiSyncUpdateResponse
var response apiChannelStatusResponse
err := json.Unmarshal(body, &response)
if err != nil {
return err
return nil, err
}
if !response.Error.IsNull() {
return errors.Err(response.Error.String)
return nil, errors.Err(response.Error.String)
}
if !response.Data.IsNull() && response.Data.String == "ok" {
return nil
if response.Data != nil {
svs := make(map[string]syncedVideo)
for _, v := range response.Data {
svs[v.VideoID] = v
}
return svs, nil
}
return errors.Err("invalid API response. Status code: %d", res.StatusCode)
return nil, errors.Err("invalid API response. Status code: %d", res.StatusCode)
}
const (
@ -149,7 +159,11 @@ func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status st
res, _ := http.PostForm(endpoint, vals)
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
var response apiSyncUpdateResponse
var response struct {
Success bool `json:"success"`
Error null.String `json:"error"`
Data null.String `json:"data"`
}
err := json.Unmarshal(body, &response)
if err != nil {
return err
@ -237,7 +251,7 @@ func (s SyncManager) Start() error {
time.Sleep(5 * time.Minute)
}
for i, sync := range syncs {
SendInfoToSlack("Syncing %s to LBRY! (iteration %d/%d - total session iterations: %d)", sync.LbryChannelName, i, len(syncs), syncCount)
SendInfoToSlack("Syncing %s (%s) to LBRY! (iteration %d/%d - total session iterations: %d)", sync.LbryChannelName, sync.YoutubeChannelID, i+1, len(syncs), syncCount+1)
err := sync.FullCycle()
if err != nil {
fatalErrors := []string{
@ -251,7 +265,7 @@ func (s SyncManager) Start() error {
}
SendInfoToSlack("A non fatal error was reported by the sync process. %s\nContinuing...", err.Error())
}
SendInfoToSlack("Syncing %s reached an end. (Iteration %d/%d - total session iterations: %d))", sync.LbryChannelName, i, len(syncs), syncCount)
SendInfoToSlack("Syncing %s (%s) reached an end. (Iteration %d/%d - total session iterations: %d))", sync.LbryChannelName, sync.YoutubeChannelID, i+1, len(syncs), syncCount+1)
syncCount++
if sync.IsInterrupted() || (s.Limit != 0 && syncCount >= s.Limit) {
shouldInterruptLoop = true

View file

@ -14,8 +14,8 @@ import (
func (s *Sync) walletSetup() error {
//prevent unnecessary concurrent execution
s.mux.Lock()
defer s.mux.Unlock()
s.walletMux.Lock()
defer s.walletMux.Unlock()
err := s.ensureChannelOwnership()
if err != nil {
return err
@ -30,31 +30,32 @@ func (s *Sync) walletSetup() error {
balance := decimal.Decimal(*balanceResp)
log.Debugf("Starting balance is %s", balance.String())
var numOnSource uint64
var numOnSource int
if s.LbryChannelName == "@UCBerkeley" {
numOnSource = 10104
} else {
numOnSource, err = s.CountVideos()
n, err := s.CountVideos()
if err != nil {
return err
}
numOnSource = int(n)
}
log.Debugf("Source channel has %d videos", numOnSource)
if numOnSource == 0 {
return nil
}
numPublished, err := s.daemon.NumClaimsInChannel(s.LbryChannelName)
if err != nil {
return err
}
s.syncedVideosMux.Lock()
numPublished := len(s.syncedVideos) //should we only count published videos? Credits are allocated even for failed ones...
s.syncedVideosMux.Unlock()
log.Debugf("We already published %d videos", numPublished)
if float64(numOnSource)-float64(numPublished) > float64(s.Manager.VideosLimit) {
numOnSource = uint64(s.Manager.VideosLimit)
if numOnSource-numPublished > s.Manager.VideosLimit {
numOnSource = s.Manager.VideosLimit
}
minBalance := (float64(numOnSource)-float64(numPublished))*(publishAmount+0.1) + channelClaimAmount
if numPublished > numOnSource {
if numPublished > numOnSource && balance.LessThan(decimal.NewFromFloat(1)) {
SendErrorToSlack("something is going on as we published more videos than those available on source: %d/%d", numPublished, numOnSource)
minBalance = 1 //since we ended up in this function it means some juice is still needed
}
@ -95,7 +96,6 @@ func (s *Sync) walletSetup() error {
}
func (s *Sync) ensureEnoughUTXOs() error {
utxolist, err := s.daemon.UTXOList()
if err != nil {
return err
@ -103,14 +103,6 @@ func (s *Sync) ensureEnoughUTXOs() error {
return errors.Err("no response")
}
if !allUTXOsConfirmed(utxolist) {
log.Println("Waiting for previous txns to confirm") // happens if you restarted the daemon soon after a previous publish run
err := s.waitUntilUTXOsConfirmed()
if err != nil {
return err
}
}
target := 40
count := 0
@ -141,12 +133,13 @@ func (s *Sync) ensureEnoughUTXOs() error {
return errors.Err("no response")
}
wait := 15 * time.Second
log.Println("Waiting " + wait.String() + " for lbryum to let us know we have the new addresses")
time.Sleep(wait)
log.Println("Creating UTXOs and waiting for them to be confirmed")
err = s.waitUntilUTXOsConfirmed()
err = s.waitForNewBlock()
if err != nil {
return err
}
} else if !allUTXOsConfirmed(utxolist) {
log.Println("Waiting for previous txns to confirm")
err := s.waitForNewBlock()
if err != nil {
return err
}
@ -155,28 +148,31 @@ func (s *Sync) ensureEnoughUTXOs() error {
return nil
}
func (s *Sync) waitUntilUTXOsConfirmed() error {
origin := time.Now()
for {
r, err := s.daemon.UTXOList()
func (s *Sync) waitForNewBlock() error {
status, err := s.daemon.Status()
if err != nil {
return err
}
for status.Wallet.Blocks == 0 || status.Wallet.BlocksBehind != 0 {
time.Sleep(5 * time.Second)
status, err = s.daemon.Status()
if err != nil {
return err
} else if r == nil {
return errors.Err("no response")
}
if allUTXOsConfirmed(r) {
return nil
}
if time.Now().After(origin.Add(15 * time.Minute)) {
//lbryum is messing with us or something. restart the daemon
//this could also be a very long block
SendErrorToSlack("We've been waiting UTXOs confirmation for %s... and this isn't normal", time.Now().Sub(origin).String())
}
wait := 30 * time.Second
log.Println("Waiting " + wait.String() + "...")
time.Sleep(wait)
}
currentBlock := status.Wallet.Blocks
for i := 0; status.Wallet.Blocks <= currentBlock; i++ {
if i%3 == 0 {
log.Printf("Waiting for new block (%d)...", currentBlock+1)
}
time.Sleep(10 * time.Second)
status, err = s.daemon.Status()
if err != nil {
return err
}
}
return nil
}
func (s *Sync) ensureChannelOwnership() error {
@ -194,6 +190,7 @@ func (s *Sync) ensureChannelOwnership() error {
isChannelMine := false
for _, channel := range *channels {
if channel.Name == s.LbryChannelName {
s.lbryChannelID = channel.ClaimID
isChannelMine = true
} else {
return errors.Err("this wallet has multiple channels. maybe something went wrong during setup?")
@ -232,16 +229,11 @@ func (s *Sync) ensureChannelOwnership() error {
s.addCredits(channelBidAmount + 0.1)
}
_, err = s.daemon.ChannelNew(s.LbryChannelName, channelBidAmount)
c, err := s.daemon.ChannelNew(s.LbryChannelName, channelBidAmount)
if err != nil {
return err
}
// niko's code says "unfortunately the queues in the daemon are not yet merged so we must give it some time for the channel to go through"
wait := 15 * time.Second
log.Println("Waiting " + wait.String() + " for channel claim to go through")
time.Sleep(wait)
s.lbryChannelID = c.ClaimID
return nil
}

View file

@ -170,25 +170,23 @@ func (v ucbVideo) saveThumbnail() error {
return err
}
func (v ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) (*SyncSummary, error) {
func (v ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) {
options := jsonrpc.PublishOptions{
Title: &v.title,
Author: strPtr("UC Berkeley"),
Description: strPtr(v.getAbbrevDescription()),
Language: strPtr("en"),
ClaimAddress: &claimAddress,
Thumbnail: strPtr("https://berk.ninja/thumbnails/" + v.id),
License: strPtr("see description"),
}
if channelName != "" {
options.ChannelName = &channelName
Title: &v.title,
Author: strPtr("UC Berkeley"),
Description: strPtr(v.getAbbrevDescription()),
Language: strPtr("en"),
ClaimAddress: &claimAddress,
Thumbnail: strPtr("https://berk.ninja/thumbnails/" + v.id),
License: strPtr("see description"),
ChannelID: &channelID,
ChangeAddress: &claimAddress,
}
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options)
}
func (v ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string, maxVideoSize int) (*SyncSummary, error) {
func (v ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) {
//download and thumbnail can be done in parallel
err := v.download()
if err != nil {
@ -202,7 +200,7 @@ func (v ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float
//}
//log.Debugln("Created thumbnail for " + v.id)
summary, err := v.publish(daemon, claimAddress, amount, channelName)
summary, err := v.publish(daemon, claimAddress, amount, channelID)
if err != nil {
return nil, errors.Prefix("publish error", err)
}

View file

@ -66,7 +66,7 @@ func (v YoutubeVideo) getFilename() string {
name := chunks[0]
if len(name) > maxLen {
return name[:maxLen]
name = name[:maxLen]
}
for _, chunk := range chunks[1:] {
@ -185,7 +185,10 @@ func (v YoutubeVideo) triggerThumbnailSave() error {
func strPtr(s string) *string { return &s }
func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) (*SyncSummary, error) {
func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) {
if channelID == "" {
return nil, errors.Err("a claim_id for the channel wasn't provided") //TODO: this is probably not needed?
}
options := jsonrpc.PublishOptions{
Title: &v.title,
Author: &v.channelTitle,
@ -195,15 +198,12 @@ func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amoun
Thumbnail: strPtr("https://berk.ninja/thumbnails/" + v.id),
License: strPtr("Copyrighted (contact author)"),
ChangeAddress: &claimAddress,
ChannelID: &channelID,
}
if channelName != "" {
options.ChannelName = &channelName
}
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options)
}
func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string, maxVideoSize int) (*SyncSummary, error) {
func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) {
//download and thumbnail can be done in parallel
err := v.download()
if err != nil {
@ -227,7 +227,7 @@ func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount f
}
log.Debugln("Created thumbnail for " + v.id)
summary, err := v.publish(daemon, claimAddress, amount, channelName)
summary, err := v.publish(daemon, claimAddress, amount, channelID)
//delete the video in all cases (and ignore the error)
_ = v.delete()
if err != nil {

View file

@ -61,16 +61,27 @@ type Sync struct {
Refill int
Manager *SyncManager
daemon *jsonrpc.Client
claimAddress string
videoDirectory string
db *redisdb.DB
daemon *jsonrpc.Client
claimAddress string
videoDirectory string
db *redisdb.DB
syncedVideos map[string]syncedVideo
syncedVideosMux *sync.Mutex
grp *stop.Group
lbryChannelID string
grp *stop.Group
walletMux *sync.Mutex
queue chan video
}
mux sync.Mutex
wg sync.WaitGroup
queue chan video
func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string) {
s.syncedVideosMux.Lock()
defer s.syncedVideosMux.Unlock()
s.syncedVideos[videoID] = syncedVideo{
VideoID: videoID,
Published: published,
FailureReason: failureReason,
}
}
// SendErrorToSlack Sends an error message to the default channel and to the process log.
@ -110,10 +121,26 @@ func (s *Sync) FullCycle() (e error) {
if s.YoutubeChannelID == "" {
return errors.Err("channel ID not provided")
}
err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSyncing)
s.syncedVideosMux = &sync.Mutex{}
s.walletMux = &sync.Mutex{}
s.db = redisdb.New()
s.grp = stop.New()
s.queue = make(chan video)
interruptChan := make(chan os.Signal, 1)
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
go func() {
<-interruptChan
log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)")
s.grp.Stop()
}()
syncedVideos, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing)
if err != nil {
return err
}
s.syncedVideosMux.Lock()
s.syncedVideos = syncedVideos
s.syncedVideosMux.Unlock()
defer func() {
if e != nil {
//conditions for which a channel shouldn't be marked as failed
@ -123,14 +150,14 @@ func (s *Sync) FullCycle() (e error) {
if util.SubstringInSlice(e.Error(), noFailConditions) {
return
}
err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusFailed)
_, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed)
if err != nil {
msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName)
err = errors.Prefix(msg, err)
e = errors.Prefix(err.Error(), e)
}
} else if !s.IsInterrupted() {
err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSynced)
_, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced)
if err != nil {
e = err
}
@ -181,18 +208,6 @@ func (s *Sync) FullCycle() (e error) {
return errors.Wrap(err, 0)
}
s.db = redisdb.New()
s.grp = stop.New()
s.queue = make(chan video)
interruptChan := make(chan os.Signal, 1)
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
go func() {
<-interruptChan
log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)")
s.grp.Stop()
}()
log.Printf("Starting daemon")
err = startDaemonViaSystemd()
if err != nil {
@ -247,7 +262,11 @@ func (s *Sync) doSync() error {
}
for i := 0; i < s.ConcurrentVideos; i++ {
go s.startWorker(i)
s.grp.Add(1)
go func() {
defer s.grp.Done()
s.startWorker(i)
}()
}
if s.LbryChannelName == "@UCBerkeley" {
@ -256,14 +275,11 @@ func (s *Sync) doSync() error {
err = s.enqueueYoutubeVideos()
}
close(s.queue)
s.wg.Wait()
s.grp.Wait()
return err
}
func (s *Sync) startWorker(workerNum int) {
s.wg.Add(1)
defer s.wg.Done()
var v video
var more bool
@ -300,6 +316,7 @@ func (s *Sync) startWorker(workerNum int) {
"no space left on device",
"NotEnoughFunds",
"Cannot publish using channel",
"more than 90% of the space has been used.",
}
if util.SubstringInSlice(err.Error(), fatalErrors) || s.StopOnError {
s.grp.Stop()
@ -336,6 +353,7 @@ func (s *Sync) startWorker(workerNum int) {
}
SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg)
}
s.AppendSyncedVideo(v.ID(), false, err.Error())
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error())
if err != nil {
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
@ -493,10 +511,32 @@ func (s *Sync) processVideo(v video) (err error) {
log.Println(v.ID() + " took " + time.Since(start).String())
}(time.Now())
alreadyPublished, err := s.db.IsPublished(v.ID())
s.syncedVideosMux.Lock()
sv, ok := s.syncedVideos[v.ID()]
s.syncedVideosMux.Unlock()
alreadyPublished := ok && sv.Published
neverRetryFailures := []string{
"Error extracting sts from embedded url response",
"the video is too big to sync, skipping for now",
}
if ok && !sv.Published && util.SubstringInSlice(sv.FailureReason, neverRetryFailures) {
log.Println(v.ID() + " can't ever be published")
return nil
}
//TODO: remove this after a few runs...
alreadyPublishedOld, err := s.db.IsPublished(v.ID())
if err != nil {
return err
}
//TODO: remove this after a few runs...
if alreadyPublishedOld && !alreadyPublished {
//seems like something in the migration of blobs didn't go perfectly right so warn about it!
SendInfoToSlack("A video that was previously published is on the local database but isn't on the remote db! fix it @Nikooo777! \nchannelID: %s, videoID: %s",
s.YoutubeChannelID, v.ID())
return nil
}
if alreadyPublished {
log.Println(v.ID() + " already published")
@ -507,18 +547,19 @@ func (s *Sync) processVideo(v video) (err error) {
log.Println(v.ID() + " is old: skipping")
return nil
}
summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.LbryChannelName, s.Manager.MaxVideoSize)
err = s.Manager.checkUsedSpace()
if err != nil {
return err
}
summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.MaxVideoSize)
if err != nil {
return err
}
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "")
if err != nil {
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
}
err = s.db.SetPublished(v.ID())
if err != nil {
return err
}
s.AppendSyncedVideo(v.ID(), true, "")
return nil
}