remove redisDB dependency #25

Merged
nikooo777 merged 12 commits from use_video_statuses into master 2018-08-20 13:56:26 +02:00
3 changed files with 48 additions and 19 deletions
Showing only changes of commit c428b5ef77 - Show all commits

View file

@ -96,7 +96,13 @@ type apiSyncUpdateResponse struct {
Data null.String `json:"data"`
}
func (s SyncManager) setChannelSyncStatus(channelID string, status string) error {
type syncedVideo struct {
VideoID string
Published bool
FailureReason string
}
func (s SyncManager) setChannelSyncStatus(channelID string, status string) (map[string]syncedVideo, error) {
endpoint := s.ApiURL + "/yt/channel_status"
res, _ := http.PostForm(endpoint, url.Values{
@ -110,15 +116,28 @@ func (s SyncManager) setChannelSyncStatus(channelID string, status string) error
var response apiSyncUpdateResponse
err := json.Unmarshal(body, &response)
if err != nil {
return err
return nil, err
}
if !response.Error.IsNull() {
return errors.Err(response.Error.String)
return nil, errors.Err(response.Error.String)
}
if !response.Data.IsNull() && response.Data.String == "ok" {
return nil
if !response.Data.IsNull() {
if response.Data.String == "ok" {
return nil, nil
}
var sv []syncedVideo
err := json.Unmarshal([]byte(response.Data.String), &sv)
if err != nil {
return nil, errors.Err("could not parse synced videos")
}
svs := make(map[string]syncedVideo)
for _, v := range sv {
svs[v.VideoID] = v
}
return svs, nil
}
return errors.Err("invalid API response. Status code: %d", res.StatusCode)
return nil, errors.Err("invalid API response. Status code: %d", res.StatusCode)
}
const (

View file

@ -44,7 +44,8 @@ func (s *Sync) walletSetup() error {
return nil
}
tiger5226 commented 2018-08-17 01:06:12 +02:00 (Migrated from github.com)
Review

why change this to an int? It better to be more specific than more general.

why change this to an int? It better to be more specific than more general.
nikooo777 commented 2018-08-20 13:46:43 +02:00 (Migrated from github.com)
Review

Grins previous review outlined a mess with casts here and there to make simple math.
I changed everything to int as it's reasonable for the values they represent.

Grins previous review outlined a mess with casts here and there to make simple math. I changed everything to int as it's reasonable for the values they represent.
numPublished, err := s.daemon.NumClaimsInChannel(s.LbryChannelName)
//numPublished, err := s.daemon.NumClaimsInChannel(s.LbryChannelName)
tiger5226 commented 2018-08-17 01:07:02 +02:00 (Migrated from github.com)
Review

what is this?! not part of the PR but this is not good to have in the code base.

what is this?! not part of the PR but this is not good to have in the code base.
nikooo777 commented 2018-08-20 13:52:54 +02:00 (Migrated from github.com)
Review

I can explain the berkeley stuff to you via DM, it's all good as it will be eventually removed from here. Not worth changing now

I can explain the berkeley stuff to you via DM, it's all good as it will be eventually removed from here. Not worth changing now
numPublished := uint64(len(s.syncedVideos)) //should we only count published videos? Credits are allocated even for failed ones...
if err != nil {
return err
}
@ -53,6 +54,11 @@ func (s *Sync) walletSetup() error {
if float64(numOnSource)-float64(numPublished) > float64(s.Manager.VideosLimit) {
numOnSource = uint64(s.Manager.VideosLimit)
}
//TODO: get rid of this as soon as we compute this condition using the database in a more reliable way
if numPublished >= numOnSource {
return errors.Err("channel is already up to date")
}
minBalance := (float64(numOnSource)-float64(numPublished))*(publishAmount+0.1) + channelClaimAmount
if numPublished > numOnSource {
SendErrorToSlack("something is going on as we published more videos than those available on source: %d/%d", numPublished, numOnSource)

View file

@ -21,7 +21,6 @@ import (
"github.com/lbryio/lbry.go/jsonrpc"
"github.com/lbryio/lbry.go/stop"
"github.com/lbryio/lbry.go/util"
"github.com/lbryio/lbry.go/ytsync/redisdb"
"github.com/lbryio/lbry.go/ytsync/sources"
"github.com/mitchellh/go-ps"
log "github.com/sirupsen/logrus"
@ -64,9 +63,9 @@ type Sync struct {
daemon *jsonrpc.Client
claimAddress string
lyoshenka commented 2018-08-09 16:31:56 +02:00 (Migrated from github.com)
Review

if you have multiple mutexes, you can't call one of them just mux. its not clear what that's for. it could be something like walletSetupMux. though once you name it that, it becomes clear that something might be wrong with wrapping the whole wallet setup in a single mutex. does the whole thing actually need to be locked?

the mutexes should be pointers, so new copies are not created if the Sync struct is copied

its clearer if you put the mutex right above the thing the mutex is locking, and leave newlines on either side. so in this example, upt videosMapMux right above syncedVideos. and then maybe rename it to syncedVideosMux, like so:

...other vars...

syncedVideosMux *sync.Mutex
syncedVideos    map[string]syncedVideo

...other vars...
if you have multiple mutexes, you can't call one of them just `mux`. its not clear what that's for. it could be something like `walletSetupMux`. though once you name it that, it becomes clear that something might be wrong with wrapping the whole wallet setup in a single mutex. does the whole thing *actually* need to be locked? the mutexes should be pointers, so new copies are not created if the Sync struct is copied its clearer if you put the mutex right above the thing the mutex is locking, and leave newlines on either side. so in this example, upt `videosMapMux` right above `syncedVideos`. and then maybe rename it to `syncedVideosMux`, like so: ``` ...other vars... syncedVideosMux *sync.Mutex syncedVideos map[string]syncedVideo ...other vars... ```
lyoshenka commented 2018-08-09 16:32:30 +02:00 (Migrated from github.com)
Review

this should also be a pointer. but more importantly, you don't need this if you already have a stop.Group. stop.Group works as a combo WaitGroup + channel that will be closed to indicate stopping (and can be closed safely multiple times). so use grp.Add() and grp.Wait() instead

this should also be a pointer. but more importantly, you don't need this if you already have a stop.Group. stop.Group works as a combo WaitGroup + channel that will be closed to indicate stopping (and can be closed safely multiple times). so use `grp.Add()` and `grp.Wait()` instead
lyoshenka commented 2018-08-09 16:38:35 +02:00 (Migrated from github.com)
Review

it won't let me comment below, so I'm commenting here:

you call Add() and Done() inside startWorker(workerNum int), but the correct pattern is to make those calls outside the function. startWorker() doesn't know if its being run asynchronously or not. you only need a waitgroup if it is. there are also subtle concurrency issues with calling it inside the function. so the right way to go is to remove Add and Done from inside startWorker, and do this:

	for i := 0; i < s.ConcurrentVideos; i++ {
		s.wg.Add(1)
		go func() {
			defer s.wg.Done()
			s.startWorker(i)
		}()
	}
it won't let me comment below, so I'm commenting here: you call `Add()` and `Done()` inside `startWorker(workerNum int)`, but the correct pattern is to make those calls outside the function. startWorker() doesn't know if its being run asynchronously or not. you only need a waitgroup if it is. there are also subtle concurrency issues with calling it inside the function. so the right way to go is to remove Add and Done from inside startWorker, and do this: ``` for i := 0; i < s.ConcurrentVideos; i++ { s.wg.Add(1) go func() { defer s.wg.Done() s.startWorker(i) }() } ```
lyoshenka commented 2018-08-09 16:45:11 +02:00 (Migrated from github.com)
Review

you can use a read/write lock to lock the map for reading when you read it, and writing when you write to it. this lets multiple threads read the data at once, which is safe and blocks less. use *sync.RWMutex instead, and call mux.RLock() and mux.RUnlock() when you're only reading from the variable. leave mux.Lock() for writing

you can use a read/write lock to lock the map for reading when you read it, and writing when you write to it. this lets multiple threads read the data at once, which is safe and blocks less. use `*sync.RWMutex` instead, and call `mux.RLock()` and `mux.RUnlock()` when you're only reading from the variable. leave `mux.Lock()` for writing
nikooo777 commented 2018-08-09 18:30:21 +02:00 (Migrated from github.com)
Review

I need to look into that, this is from your original code and I don't remember ever changing it. Thanks for the pointers there.

I need to look into that, this is from your original code and I don't remember ever changing it. Thanks for the pointers there.
nikooo777 commented 2018-08-09 18:32:46 +02:00 (Migrated from github.com)
Review

Will rename and move the mutexes.
The reason I didn't use read/write locks is that i don't want the application to read when it's being written to, plus the locks are held for a very very short time so there would be no noticeable improvement, only a higher risk of race conditions happening during wallet refills.

Will rename and move the mutexes. The reason I didn't use read/write locks is that i don't want the application to read when it's being written to, plus the locks are held for a very very short time so there would be no noticeable improvement, only a higher risk of race conditions happening during wallet refills.
nikooo777 commented 2018-08-14 16:37:24 +02:00 (Migrated from github.com)
Review

I looked more into why we need to lock the whole walletSetup function. The reason I'm doing that is to avoid multiple threads from refilling the wallet concurrently. I don't think I can easily break up the the function to lock fewer lines of code. I think it's fair to leave it like that.

I removed the wait group in favor of the stop group

I looked more into why we need to lock the whole walletSetup function. The reason I'm doing that is to avoid multiple threads from refilling the wallet concurrently. I don't think I can easily break up the the function to lock fewer lines of code. I think it's fair to leave it like that. I removed the wait group in favor of the stop group
lyoshenka commented 2018-08-18 15:28:20 +02:00 (Migrated from github.com)
Review

a read/write lock does not allow reading during writing. thats the point of every lock. what it does allow is multiple concurrent reads. when you do RLock, others can RLock at the same time. when you Lock (for writing), no one's allowed to Lock or RLock at the same time.

a read/write lock does not allow reading during writing. thats the point of every lock. what it does allow is multiple concurrent reads. when you do RLock, others can RLock at the same time. when you Lock (for writing), no one's allowed to Lock or RLock at the same time.
nikooo777 commented 2018-08-20 13:03:09 +02:00 (Migrated from github.com)
Review

Oh yes, you're right, not sure what was going in my mind. I'll swap that

Oh yes, you're right, not sure what was going in my mind. I'll swap that
videoDirectory string
db *redisdb.DB
grp *stop.Group
//db *redisdb.DB
syncedVideos map[string]syncedVideo
grp *stop.Group
mux sync.Mutex
wg sync.WaitGroup
@ -110,7 +109,7 @@ func (s *Sync) FullCycle() (e error) {
if s.YoutubeChannelID == "" {
return errors.Err("channel ID not provided")
}
err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSyncing)
syncedVideos, err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSyncing)
if err != nil {
return err
}
@ -119,18 +118,19 @@ func (s *Sync) FullCycle() (e error) {
//conditions for which a channel shouldn't be marked as failed
noFailConditions := []string{
"this youtube channel is being managed by another server",
"channel is already up to date",
}
if util.SubstringInSlice(e.Error(), noFailConditions) {
return
}
err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusFailed)
_, err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusFailed)
if err != nil {
msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName)
err = errors.Prefix(msg, err)
e = errors.Prefix(err.Error(), e)
}
} else if !s.IsInterrupted() {
err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSynced)
_, err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSynced)
if err != nil {
e = err
}
@ -181,7 +181,8 @@ func (s *Sync) FullCycle() (e error) {
return errors.Wrap(err, 0)
}
s.db = redisdb.New()
//s.db = redisdb.New()
s.syncedVideos = syncedVideos
s.grp = stop.New()
s.queue = make(chan video)
@ -493,9 +494,12 @@ func (s *Sync) processVideo(v video) (err error) {
log.Println(v.ID() + " took " + time.Since(start).String())
}(time.Now())
alreadyPublished, err := s.db.IsPublished(v.ID())
if err != nil {
return err
sv, ok := s.syncedVideos[v.ID()]
alreadyPublished := ok && sv.Published
if ok && !sv.Published && strings.Contains(sv.FailureReason, "Error extracting sts from embedded url response") {
log.Println(v.ID() + " can't be published")
return nil
}
if alreadyPublished {
@ -515,7 +519,7 @@ func (s *Sync) processVideo(v video) (err error) {
if err != nil {
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
}
err = s.db.SetPublished(v.ID())
//err = s.db.SetPublished(v.ID())
if err != nil {
return err
}