remove redis dependency

use video statuses
This commit is contained in:
Niko Storni 2018-07-30 19:19:12 -04:00
parent 8d2f942937
commit dfb55cecb5
3 changed files with 48 additions and 19 deletions

View file

@ -96,7 +96,13 @@ type apiSyncUpdateResponse struct {
Data null.String `json:"data"` Data null.String `json:"data"`
} }
func (s SyncManager) setChannelSyncStatus(channelID string, status string) error { type syncedVideo struct {
VideoID string
Published bool
FailureReason string
}
func (s SyncManager) setChannelSyncStatus(channelID string, status string) (map[string]syncedVideo, error) {
endpoint := s.ApiURL + "/yt/channel_status" endpoint := s.ApiURL + "/yt/channel_status"
res, _ := http.PostForm(endpoint, url.Values{ res, _ := http.PostForm(endpoint, url.Values{
@ -110,15 +116,28 @@ func (s SyncManager) setChannelSyncStatus(channelID string, status string) error
var response apiSyncUpdateResponse var response apiSyncUpdateResponse
err := json.Unmarshal(body, &response) err := json.Unmarshal(body, &response)
if err != nil { if err != nil {
return err return nil, err
} }
if !response.Error.IsNull() { if !response.Error.IsNull() {
return errors.Err(response.Error.String) return nil, errors.Err(response.Error.String)
} }
if !response.Data.IsNull() && response.Data.String == "ok" { if !response.Data.IsNull() {
return nil if response.Data.String == "ok" {
return nil, nil
} }
return errors.Err("invalid API response. Status code: %d", res.StatusCode) var sv []syncedVideo
err := json.Unmarshal([]byte(response.Data.String), &sv)
if err != nil {
return nil, errors.Err("could not parse synced videos")
}
svs := make(map[string]syncedVideo)
for _, v := range sv {
svs[v.VideoID] = v
}
return svs, nil
}
return nil, errors.Err("invalid API response. Status code: %d", res.StatusCode)
} }
const ( const (

View file

@ -44,7 +44,8 @@ func (s *Sync) walletSetup() error {
return nil return nil
} }
numPublished, err := s.daemon.NumClaimsInChannel(s.LbryChannelName) //numPublished, err := s.daemon.NumClaimsInChannel(s.LbryChannelName)
numPublished := uint64(len(s.syncedVideos)) //should we only count published videos? Credits are allocated even for failed ones...
if err != nil { if err != nil {
return err return err
} }
@ -53,6 +54,11 @@ func (s *Sync) walletSetup() error {
if float64(numOnSource)-float64(numPublished) > float64(s.Manager.VideosLimit) { if float64(numOnSource)-float64(numPublished) > float64(s.Manager.VideosLimit) {
numOnSource = uint64(s.Manager.VideosLimit) numOnSource = uint64(s.Manager.VideosLimit)
} }
//TODO: get rid of this as soon as we compute this condition using the database in a more reliable way
if numPublished >= numOnSource {
return errors.Err("channel is already up to date")
}
minBalance := (float64(numOnSource)-float64(numPublished))*(publishAmount+0.1) + channelClaimAmount minBalance := (float64(numOnSource)-float64(numPublished))*(publishAmount+0.1) + channelClaimAmount
if numPublished > numOnSource { if numPublished > numOnSource {
SendErrorToSlack("something is going on as we published more videos than those available on source: %d/%d", numPublished, numOnSource) SendErrorToSlack("something is going on as we published more videos than those available on source: %d/%d", numPublished, numOnSource)

View file

@ -21,7 +21,6 @@ import (
"github.com/lbryio/lbry.go/jsonrpc" "github.com/lbryio/lbry.go/jsonrpc"
"github.com/lbryio/lbry.go/stop" "github.com/lbryio/lbry.go/stop"
"github.com/lbryio/lbry.go/util" "github.com/lbryio/lbry.go/util"
"github.com/lbryio/lbry.go/ytsync/redisdb"
"github.com/lbryio/lbry.go/ytsync/sources" "github.com/lbryio/lbry.go/ytsync/sources"
"github.com/mitchellh/go-ps" "github.com/mitchellh/go-ps"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -64,8 +63,8 @@ type Sync struct {
daemon *jsonrpc.Client daemon *jsonrpc.Client
claimAddress string claimAddress string
videoDirectory string videoDirectory string
db *redisdb.DB //db *redisdb.DB
syncedVideos map[string]syncedVideo
grp *stop.Group grp *stop.Group
mux sync.Mutex mux sync.Mutex
@ -110,7 +109,7 @@ func (s *Sync) FullCycle() (e error) {
if s.YoutubeChannelID == "" { if s.YoutubeChannelID == "" {
return errors.Err("channel ID not provided") return errors.Err("channel ID not provided")
} }
err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSyncing) syncedVideos, err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSyncing)
if err != nil { if err != nil {
return err return err
} }
@ -119,18 +118,19 @@ func (s *Sync) FullCycle() (e error) {
//conditions for which a channel shouldn't be marked as failed //conditions for which a channel shouldn't be marked as failed
noFailConditions := []string{ noFailConditions := []string{
"this youtube channel is being managed by another server", "this youtube channel is being managed by another server",
"channel is already up to date",
} }
if util.SubstringInSlice(e.Error(), noFailConditions) { if util.SubstringInSlice(e.Error(), noFailConditions) {
return return
} }
err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusFailed) _, err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusFailed)
if err != nil { if err != nil {
msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName) msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName)
err = errors.Prefix(msg, err) err = errors.Prefix(msg, err)
e = errors.Prefix(err.Error(), e) e = errors.Prefix(err.Error(), e)
} }
} else if !s.IsInterrupted() { } else if !s.IsInterrupted() {
err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSynced) _, err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSynced)
if err != nil { if err != nil {
e = err e = err
} }
@ -181,7 +181,8 @@ func (s *Sync) FullCycle() (e error) {
return errors.Wrap(err, 0) return errors.Wrap(err, 0)
} }
s.db = redisdb.New() //s.db = redisdb.New()
s.syncedVideos = syncedVideos
s.grp = stop.New() s.grp = stop.New()
s.queue = make(chan video) s.queue = make(chan video)
@ -493,9 +494,12 @@ func (s *Sync) processVideo(v video) (err error) {
log.Println(v.ID() + " took " + time.Since(start).String()) log.Println(v.ID() + " took " + time.Since(start).String())
}(time.Now()) }(time.Now())
alreadyPublished, err := s.db.IsPublished(v.ID()) sv, ok := s.syncedVideos[v.ID()]
if err != nil { alreadyPublished := ok && sv.Published
return err
if ok && !sv.Published && strings.Contains(sv.FailureReason, "Error extracting sts from embedded url response") {
log.Println(v.ID() + " can't be published")
return nil
} }
if alreadyPublished { if alreadyPublished {
@ -515,7 +519,7 @@ func (s *Sync) processVideo(v video) (err error) {
if err != nil { if err != nil {
SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
} }
err = s.db.SetPublished(v.ID()) //err = s.db.SetPublished(v.ID())
if err != nil { if err != nil {
return err return err
} }