use stop group instead of wait group
refactoring address reviews
This commit is contained in:
parent
2cb9082b50
commit
38c1883dde
3 changed files with 50 additions and 51 deletions
|
@ -136,12 +136,6 @@ const (
|
|||
VideoStatusFailed = "failed"
|
||||
)
|
||||
|
||||
type apiVideoStatusResponse struct {
|
||||
Success bool `json:"success"`
|
||||
Error null.String `json:"error"`
|
||||
Data null.String `json:"data"`
|
||||
}
|
||||
|
||||
func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string) error {
|
||||
endpoint := s.ApiURL + "/yt/video_status"
|
||||
|
||||
|
@ -165,7 +159,11 @@ func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status st
|
|||
res, _ := http.PostForm(endpoint, vals)
|
||||
defer res.Body.Close()
|
||||
body, _ := ioutil.ReadAll(res.Body)
|
||||
var response apiVideoStatusResponse
|
||||
var response struct {
|
||||
Success bool `json:"success"`
|
||||
Error null.String `json:"error"`
|
||||
Data null.String `json:"data"`
|
||||
}
|
||||
err := json.Unmarshal(body, &response)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -14,8 +14,8 @@ import (
|
|||
|
||||
func (s *Sync) walletSetup() error {
|
||||
//prevent unnecessary concurrent execution
|
||||
s.mux.Lock()
|
||||
defer s.mux.Unlock()
|
||||
s.walletMux.Lock()
|
||||
defer s.walletMux.Unlock()
|
||||
err := s.ensureChannelOwnership()
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -30,27 +30,28 @@ func (s *Sync) walletSetup() error {
|
|||
balance := decimal.Decimal(*balanceResp)
|
||||
log.Debugf("Starting balance is %s", balance.String())
|
||||
|
||||
var numOnSource uint64
|
||||
var numOnSource int
|
||||
if s.LbryChannelName == "@UCBerkeley" {
|
||||
numOnSource = 10104
|
||||
} else {
|
||||
numOnSource, err = s.CountVideos()
|
||||
n, err := s.CountVideos()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
numOnSource = int(n)
|
||||
}
|
||||
log.Debugf("Source channel has %d videos", numOnSource)
|
||||
if numOnSource == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
s.videosMapMux.Lock()
|
||||
numPublished := uint64(len(s.syncedVideos)) //should we only count published videos? Credits are allocated even for failed ones...
|
||||
s.videosMapMux.Unlock()
|
||||
s.syncedVideosMux.Lock()
|
||||
numPublished := len(s.syncedVideos) //should we only count published videos? Credits are allocated even for failed ones...
|
||||
s.syncedVideosMux.Unlock()
|
||||
log.Debugf("We already published %d videos", numPublished)
|
||||
|
||||
if float64(numOnSource)-float64(numPublished) > float64(s.Manager.VideosLimit) {
|
||||
numOnSource = uint64(s.Manager.VideosLimit)
|
||||
if numOnSource-numPublished > s.Manager.VideosLimit {
|
||||
numOnSource = s.Manager.VideosLimit
|
||||
}
|
||||
|
||||
minBalance := (float64(numOnSource)-float64(numPublished))*(publishAmount+0.1) + channelClaimAmount
|
||||
|
|
|
@ -61,23 +61,22 @@ type Sync struct {
|
|||
Refill int
|
||||
Manager *SyncManager
|
||||
|
||||
daemon *jsonrpc.Client
|
||||
claimAddress string
|
||||
videoDirectory string
|
||||
db *redisdb.DB
|
||||
syncedVideos map[string]syncedVideo
|
||||
grp *stop.Group
|
||||
lbryChannelID string
|
||||
daemon *jsonrpc.Client
|
||||
claimAddress string
|
||||
videoDirectory string
|
||||
db *redisdb.DB
|
||||
syncedVideos map[string]syncedVideo
|
||||
syncedVideosMux *sync.Mutex
|
||||
grp *stop.Group
|
||||
lbryChannelID string
|
||||
|
||||
videosMapMux sync.Mutex
|
||||
mux sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
queue chan video
|
||||
walletMux *sync.Mutex
|
||||
queue chan video
|
||||
}
|
||||
|
||||
func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string) {
|
||||
s.videosMapMux.Lock()
|
||||
defer s.videosMapMux.Unlock()
|
||||
s.syncedVideosMux.Lock()
|
||||
defer s.syncedVideosMux.Unlock()
|
||||
s.syncedVideos[videoID] = syncedVideo{
|
||||
VideoID: videoID,
|
||||
Published: published,
|
||||
|
@ -122,10 +121,25 @@ func (s *Sync) FullCycle() (e error) {
|
|||
if s.YoutubeChannelID == "" {
|
||||
return errors.Err("channel ID not provided")
|
||||
}
|
||||
s.syncedVideosMux = &sync.Mutex{}
|
||||
s.walletMux = &sync.Mutex{}
|
||||
s.db = redisdb.New()
|
||||
s.grp = stop.New()
|
||||
s.queue = make(chan video)
|
||||
interruptChan := make(chan os.Signal, 1)
|
||||
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
|
||||
go func() {
|
||||
<-interruptChan
|
||||
log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)")
|
||||
s.grp.Stop()
|
||||
}()
|
||||
syncedVideos, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.syncedVideosMux.Lock()
|
||||
s.syncedVideos = syncedVideos
|
||||
s.syncedVideosMux.Unlock()
|
||||
|
||||
defer func() {
|
||||
if e != nil {
|
||||
|
@ -194,21 +208,6 @@ func (s *Sync) FullCycle() (e error) {
|
|||
return errors.Wrap(err, 0)
|
||||
}
|
||||
|
||||
s.db = redisdb.New()
|
||||
s.videosMapMux.Lock()
|
||||
s.syncedVideos = syncedVideos
|
||||
s.videosMapMux.Unlock()
|
||||
s.grp = stop.New()
|
||||
s.queue = make(chan video)
|
||||
|
||||
interruptChan := make(chan os.Signal, 1)
|
||||
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
|
||||
go func() {
|
||||
<-interruptChan
|
||||
log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)")
|
||||
s.grp.Stop()
|
||||
}()
|
||||
|
||||
log.Printf("Starting daemon")
|
||||
err = startDaemonViaSystemd()
|
||||
if err != nil {
|
||||
|
@ -263,7 +262,11 @@ func (s *Sync) doSync() error {
|
|||
}
|
||||
|
||||
for i := 0; i < s.ConcurrentVideos; i++ {
|
||||
go s.startWorker(i)
|
||||
s.grp.Add(1)
|
||||
go func() {
|
||||
defer s.grp.Done()
|
||||
s.startWorker(i)
|
||||
}()
|
||||
}
|
||||
|
||||
if s.LbryChannelName == "@UCBerkeley" {
|
||||
|
@ -272,14 +275,11 @@ func (s *Sync) doSync() error {
|
|||
err = s.enqueueYoutubeVideos()
|
||||
}
|
||||
close(s.queue)
|
||||
s.wg.Wait()
|
||||
s.grp.Wait()
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Sync) startWorker(workerNum int) {
|
||||
s.wg.Add(1)
|
||||
defer s.wg.Done()
|
||||
|
||||
var v video
|
||||
var more bool
|
||||
|
||||
|
@ -510,9 +510,9 @@ func (s *Sync) processVideo(v video) (err error) {
|
|||
log.Println(v.ID() + " took " + time.Since(start).String())
|
||||
}(time.Now())
|
||||
|
||||
s.videosMapMux.Lock()
|
||||
s.syncedVideosMux.Lock()
|
||||
sv, ok := s.syncedVideos[v.ID()]
|
||||
s.videosMapMux.Unlock()
|
||||
s.syncedVideosMux.Unlock()
|
||||
alreadyPublished := ok && sv.Published
|
||||
|
||||
neverRetryFailures := []string{
|
||||
|
|
Loading…
Reference in a new issue