remove redisDB dependency #25
|
@ -90,13 +90,19 @@ func (s SyncManager) fetchChannels(status string) ([]apiYoutubeChannel, error) {
|
||||||
return response.Data, nil
|
return response.Data, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type apiSyncUpdateResponse struct {
|
type apiChannelStatusResponse struct {
|
||||||
Success bool `json:"success"`
|
Success bool `json:"success"`
|
||||||
Error null.String `json:"error"`
|
Error null.String `json:"error"`
|
||||||
Data null.String `json:"data"`
|
Data []syncedVideo `json:"data"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s SyncManager) setChannelSyncStatus(channelID string, status string) error {
|
type syncedVideo struct {
|
||||||
|
VideoID string `json:"video_id"`
|
||||||
|
Published bool `json:"published"`
|
||||||
|
FailureReason string `json:"failure_reason"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s SyncManager) setChannelStatus(channelID string, status string) (map[string]syncedVideo, error) {
|
||||||
endpoint := s.ApiURL + "/yt/channel_status"
|
endpoint := s.ApiURL + "/yt/channel_status"
|
||||||
|
|
||||||
res, _ := http.PostForm(endpoint, url.Values{
|
res, _ := http.PostForm(endpoint, url.Values{
|
||||||
|
@ -107,18 +113,22 @@ func (s SyncManager) setChannelSyncStatus(channelID string, status string) error
|
||||||
})
|
})
|
||||||
defer res.Body.Close()
|
defer res.Body.Close()
|
||||||
body, _ := ioutil.ReadAll(res.Body)
|
body, _ := ioutil.ReadAll(res.Body)
|
||||||
var response apiSyncUpdateResponse
|
var response apiChannelStatusResponse
|
||||||
err := json.Unmarshal(body, &response)
|
err := json.Unmarshal(body, &response)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
if !response.Error.IsNull() {
|
if !response.Error.IsNull() {
|
||||||
return errors.Err(response.Error.String)
|
return nil, errors.Err(response.Error.String)
|
||||||
}
|
}
|
||||||
if !response.Data.IsNull() && response.Data.String == "ok" {
|
if response.Data != nil {
|
||||||
return nil
|
svs := make(map[string]syncedVideo)
|
||||||
|
for _, v := range response.Data {
|
||||||
|
svs[v.VideoID] = v
|
||||||
|
}
|
||||||
|
return svs, nil
|
||||||
}
|
}
|
||||||
return errors.Err("invalid API response. Status code: %d", res.StatusCode)
|
return nil, errors.Err("invalid API response. Status code: %d", res.StatusCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -149,7 +159,11 @@ func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status st
|
||||||
res, _ := http.PostForm(endpoint, vals)
|
res, _ := http.PostForm(endpoint, vals)
|
||||||
defer res.Body.Close()
|
defer res.Body.Close()
|
||||||
body, _ := ioutil.ReadAll(res.Body)
|
body, _ := ioutil.ReadAll(res.Body)
|
||||||
var response apiSyncUpdateResponse
|
var response struct {
|
||||||
|
Success bool `json:"success"`
|
||||||
|
Error null.String `json:"error"`
|
||||||
ok, will look into that ok, will look into that
|
|||||||
|
Data null.String `json:"data"`
|
||||||
|
}
|
||||||
err := json.Unmarshal(body, &response)
|
err := json.Unmarshal(body, &response)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -237,7 +251,7 @@ func (s SyncManager) Start() error {
|
||||||
time.Sleep(5 * time.Minute)
|
time.Sleep(5 * time.Minute)
|
||||||
}
|
}
|
||||||
for i, sync := range syncs {
|
for i, sync := range syncs {
|
||||||
SendInfoToSlack("Syncing %s to LBRY! (iteration %d/%d - total session iterations: %d)", sync.LbryChannelName, i, len(syncs), syncCount)
|
SendInfoToSlack("Syncing %s (%s) to LBRY! (iteration %d/%d - total session iterations: %d)", sync.LbryChannelName, sync.YoutubeChannelID, i+1, len(syncs), syncCount+1)
|
||||||
err := sync.FullCycle()
|
err := sync.FullCycle()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fatalErrors := []string{
|
fatalErrors := []string{
|
||||||
|
@ -251,7 +265,7 @@ func (s SyncManager) Start() error {
|
||||||
}
|
}
|
||||||
SendInfoToSlack("A non fatal error was reported by the sync process. %s\nContinuing...", err.Error())
|
SendInfoToSlack("A non fatal error was reported by the sync process. %s\nContinuing...", err.Error())
|
||||||
}
|
}
|
||||||
SendInfoToSlack("Syncing %s reached an end. (Iteration %d/%d - total session iterations: %d))", sync.LbryChannelName, i, len(syncs), syncCount)
|
SendInfoToSlack("Syncing %s (%s) reached an end. (Iteration %d/%d - total session iterations: %d))", sync.LbryChannelName, sync.YoutubeChannelID, i+1, len(syncs), syncCount+1)
|
||||||
syncCount++
|
syncCount++
|
||||||
if sync.IsInterrupted() || (s.Limit != 0 && syncCount >= s.Limit) {
|
if sync.IsInterrupted() || (s.Limit != 0 && syncCount >= s.Limit) {
|
||||||
shouldInterruptLoop = true
|
shouldInterruptLoop = true
|
||||||
|
|
|
@ -14,8 +14,8 @@ import (
|
||||||
|
|
||||||
func (s *Sync) walletSetup() error {
|
func (s *Sync) walletSetup() error {
|
||||||
//prevent unnecessary concurrent execution
|
//prevent unnecessary concurrent execution
|
||||||
s.mux.Lock()
|
s.walletMux.Lock()
|
||||||
defer s.mux.Unlock()
|
defer s.walletMux.Unlock()
|
||||||
err := s.ensureChannelOwnership()
|
err := s.ensureChannelOwnership()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
why are we using a mutex lock? What is wrong with concurrent execution? A mutex is not the standard way to handle synchronization. It can cause deadlocks. Ideally if you want things to run synchronously from different go routines, you use channels in golang. why are we using a mutex lock? What is wrong with concurrent execution? A mutex is not the standard way to handle synchronization. It can cause deadlocks. Ideally if you want things to run synchronously from different go routines, you use channels in golang.
this is the only place the lock is used and no multiple locks are being held that can cause a deadlock here. this is the only place the lock is used and no multiple locks are being held that can cause a deadlock here.
However I would like to discuss channels with you to understand how they could be used here (not sure they can)
|
|||||||
return err
|
return err
|
||||||
|
@ -30,31 +30,32 @@ func (s *Sync) walletSetup() error {
|
||||||
balance := decimal.Decimal(*balanceResp)
|
balance := decimal.Decimal(*balanceResp)
|
||||||
log.Debugf("Starting balance is %s", balance.String())
|
log.Debugf("Starting balance is %s", balance.String())
|
||||||
|
|
||||||
var numOnSource uint64
|
var numOnSource int
|
||||||
if s.LbryChannelName == "@UCBerkeley" {
|
if s.LbryChannelName == "@UCBerkeley" {
|
||||||
numOnSource = 10104
|
numOnSource = 10104
|
||||||
} else {
|
} else {
|
||||||
numOnSource, err = s.CountVideos()
|
n, err := s.CountVideos()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
numOnSource = int(n)
|
||||||
}
|
}
|
||||||
log.Debugf("Source channel has %d videos", numOnSource)
|
log.Debugf("Source channel has %d videos", numOnSource)
|
||||||
why are all these cast to floats? they can be ints here why are all these cast to floats? they can be ints here
no clue. but you're completely right. Will fix no clue. but you're completely right. Will fix
|
|||||||
if numOnSource == 0 {
|
if numOnSource == 0 {
|
||||||
return nil
|
return nil
|
||||||
why change this to an int? It better to be more specific than more general. why change this to an int? It better to be more specific than more general.
Grins previous review outlined a mess with casts here and there to make simple math. Grins previous review outlined a mess with casts here and there to make simple math.
I changed everything to int as it's reasonable for the values they represent.
|
|||||||
}
|
}
|
||||||
|
|
||||||
what is this?! not part of the PR but this is not good to have in the code base. what is this?! not part of the PR but this is not good to have in the code base.
I can explain the berkeley stuff to you via DM, it's all good as it will be eventually removed from here. Not worth changing now I can explain the berkeley stuff to you via DM, it's all good as it will be eventually removed from here. Not worth changing now
|
|||||||
numPublished, err := s.daemon.NumClaimsInChannel(s.LbryChannelName)
|
s.syncedVideosMux.Lock()
|
||||||
if err != nil {
|
numPublished := len(s.syncedVideos) //should we only count published videos? Credits are allocated even for failed ones...
|
||||||
return err
|
s.syncedVideosMux.Unlock()
|
||||||
}
|
|
||||||
log.Debugf("We already published %d videos", numPublished)
|
log.Debugf("We already published %d videos", numPublished)
|
||||||
|
|
||||||
if float64(numOnSource)-float64(numPublished) > float64(s.Manager.VideosLimit) {
|
if numOnSource-numPublished > s.Manager.VideosLimit {
|
||||||
numOnSource = uint64(s.Manager.VideosLimit)
|
numOnSource = s.Manager.VideosLimit
|
||||||
}
|
}
|
||||||
|
|
||||||
minBalance := (float64(numOnSource)-float64(numPublished))*(publishAmount+0.1) + channelClaimAmount
|
minBalance := (float64(numOnSource)-float64(numPublished))*(publishAmount+0.1) + channelClaimAmount
|
||||||
if numPublished > numOnSource {
|
if numPublished > numOnSource && balance.LessThan(decimal.NewFromFloat(1)) {
|
||||||
SendErrorToSlack("something is going on as we published more videos than those available on source: %d/%d", numPublished, numOnSource)
|
SendErrorToSlack("something is going on as we published more videos than those available on source: %d/%d", numPublished, numOnSource)
|
||||||
minBalance = 1 //since we ended up in this function it means some juice is still needed
|
minBalance = 1 //since we ended up in this function it means some juice is still needed
|
||||||
}
|
}
|
||||||
|
@ -95,7 +96,6 @@ func (s *Sync) walletSetup() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Sync) ensureEnoughUTXOs() error {
|
func (s *Sync) ensureEnoughUTXOs() error {
|
||||||
|
|
||||||
utxolist, err := s.daemon.UTXOList()
|
utxolist, err := s.daemon.UTXOList()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -103,14 +103,6 @@ func (s *Sync) ensureEnoughUTXOs() error {
|
||||||
return errors.Err("no response")
|
return errors.Err("no response")
|
||||||
}
|
}
|
||||||
|
|
||||||
if !allUTXOsConfirmed(utxolist) {
|
|
||||||
log.Println("Waiting for previous txns to confirm") // happens if you restarted the daemon soon after a previous publish run
|
|
||||||
err := s.waitUntilUTXOsConfirmed()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
target := 40
|
target := 40
|
||||||
count := 0
|
count := 0
|
||||||
|
|
||||||
|
@ -141,12 +133,13 @@ func (s *Sync) ensureEnoughUTXOs() error {
|
||||||
return errors.Err("no response")
|
return errors.Err("no response")
|
||||||
}
|
}
|
||||||
|
|
||||||
wait := 15 * time.Second
|
err = s.waitForNewBlock()
|
||||||
log.Println("Waiting " + wait.String() + " for lbryum to let us know we have the new addresses")
|
if err != nil {
|
||||||
time.Sleep(wait)
|
return err
|
||||||
|
}
|
||||||
log.Println("Creating UTXOs and waiting for them to be confirmed")
|
} else if !allUTXOsConfirmed(utxolist) {
|
||||||
err = s.waitUntilUTXOsConfirmed()
|
log.Println("Waiting for previous txns to confirm")
|
||||||
|
err := s.waitForNewBlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -155,28 +148,31 @@ func (s *Sync) ensureEnoughUTXOs() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Sync) waitUntilUTXOsConfirmed() error {
|
func (s *Sync) waitForNewBlock() error {
|
||||||
origin := time.Now()
|
status, err := s.daemon.Status()
|
||||||
for {
|
if err != nil {
|
||||||
r, err := s.daemon.UTXOList()
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for status.Wallet.Blocks == 0 || status.Wallet.BlocksBehind != 0 {
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
status, err = s.daemon.Status()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
} else if r == nil {
|
|
||||||
return errors.Err("no response")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if allUTXOsConfirmed(r) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if time.Now().After(origin.Add(15 * time.Minute)) {
|
|
||||||
//lbryum is messing with us or something. restart the daemon
|
|
||||||
//this could also be a very long block
|
|
||||||
SendErrorToSlack("We've been waiting UTXOs confirmation for %s... and this isn't normal", time.Now().Sub(origin).String())
|
|
||||||
}
|
|
||||||
wait := 30 * time.Second
|
|
||||||
log.Println("Waiting " + wait.String() + "...")
|
|
||||||
time.Sleep(wait)
|
|
||||||
}
|
}
|
||||||
|
currentBlock := status.Wallet.Blocks
|
||||||
|
for i := 0; status.Wallet.Blocks <= currentBlock; i++ {
|
||||||
|
if i%3 == 0 {
|
||||||
|
log.Printf("Waiting for new block (%d)...", currentBlock+1)
|
||||||
|
}
|
||||||
|
time.Sleep(10 * time.Second)
|
||||||
|
status, err = s.daemon.Status()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Sync) ensureChannelOwnership() error {
|
func (s *Sync) ensureChannelOwnership() error {
|
||||||
|
@ -194,6 +190,7 @@ func (s *Sync) ensureChannelOwnership() error {
|
||||||
isChannelMine := false
|
isChannelMine := false
|
||||||
for _, channel := range *channels {
|
for _, channel := range *channels {
|
||||||
if channel.Name == s.LbryChannelName {
|
if channel.Name == s.LbryChannelName {
|
||||||
|
s.lbryChannelID = channel.ClaimID
|
||||||
isChannelMine = true
|
isChannelMine = true
|
||||||
} else {
|
} else {
|
||||||
return errors.Err("this wallet has multiple channels. maybe something went wrong during setup?")
|
return errors.Err("this wallet has multiple channels. maybe something went wrong during setup?")
|
||||||
|
@ -232,16 +229,11 @@ func (s *Sync) ensureChannelOwnership() error {
|
||||||
s.addCredits(channelBidAmount + 0.1)
|
s.addCredits(channelBidAmount + 0.1)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = s.daemon.ChannelNew(s.LbryChannelName, channelBidAmount)
|
c, err := s.daemon.ChannelNew(s.LbryChannelName, channelBidAmount)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
s.lbryChannelID = c.ClaimID
|
||||||
// niko's code says "unfortunately the queues in the daemon are not yet merged so we must give it some time for the channel to go through"
|
|
||||||
wait := 15 * time.Second
|
|
||||||
log.Println("Waiting " + wait.String() + " for channel claim to go through")
|
|
||||||
time.Sleep(wait)
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -170,25 +170,23 @@ func (v ucbVideo) saveThumbnail() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) (*SyncSummary, error) {
|
func (v ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) {
|
||||||
options := jsonrpc.PublishOptions{
|
options := jsonrpc.PublishOptions{
|
||||||
Title: &v.title,
|
Title: &v.title,
|
||||||
Author: strPtr("UC Berkeley"),
|
Author: strPtr("UC Berkeley"),
|
||||||
Description: strPtr(v.getAbbrevDescription()),
|
Description: strPtr(v.getAbbrevDescription()),
|
||||||
Language: strPtr("en"),
|
Language: strPtr("en"),
|
||||||
ClaimAddress: &claimAddress,
|
ClaimAddress: &claimAddress,
|
||||||
Thumbnail: strPtr("https://berk.ninja/thumbnails/" + v.id),
|
Thumbnail: strPtr("https://berk.ninja/thumbnails/" + v.id),
|
||||||
License: strPtr("see description"),
|
License: strPtr("see description"),
|
||||||
}
|
ChannelID: &channelID,
|
||||||
|
ChangeAddress: &claimAddress,
|
||||||
if channelName != "" {
|
|
||||||
options.ChannelName = &channelName
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options)
|
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options)
|
||||||
}
|
}
|
||||||
|
|
||||||
Why is the author UC Berkley? should this function be called publishUCBerkley? Why is the author UC Berkley? should this function be called publishUCBerkley?
ahhh, ahhh, `ucbVideo` Seems odd to have a struct type for a specific author.
|
|||||||
func (v ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string, maxVideoSize int) (*SyncSummary, error) {
|
func (v ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) {
|
||||||
//download and thumbnail can be done in parallel
|
//download and thumbnail can be done in parallel
|
||||||
err := v.download()
|
err := v.download()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -202,7 +200,7 @@ func (v ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float
|
||||||
//}
|
//}
|
||||||
//log.Debugln("Created thumbnail for " + v.id)
|
//log.Debugln("Created thumbnail for " + v.id)
|
||||||
|
|
||||||
summary, err := v.publish(daemon, claimAddress, amount, channelName)
|
summary, err := v.publish(daemon, claimAddress, amount, channelID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Prefix("publish error", err)
|
return nil, errors.Prefix("publish error", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -66,7 +66,7 @@ func (v YoutubeVideo) getFilename() string {
|
||||||
|
|
||||||
name := chunks[0]
|
name := chunks[0]
|
||||||
if len(name) > maxLen {
|
if len(name) > maxLen {
|
||||||
return name[:maxLen]
|
name = name[:maxLen]
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, chunk := range chunks[1:] {
|
for _, chunk := range chunks[1:] {
|
||||||
|
@ -185,7 +185,10 @@ func (v YoutubeVideo) triggerThumbnailSave() error {
|
||||||
|
|
||||||
func strPtr(s string) *string { return &s }
|
func strPtr(s string) *string { return &s }
|
||||||
|
|
||||||
func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) (*SyncSummary, error) {
|
func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) {
|
||||||
|
if channelID == "" {
|
||||||
|
return nil, errors.Err("a claim_id for the channel wasn't provided") //TODO: this is probably not needed?
|
||||||
|
}
|
||||||
options := jsonrpc.PublishOptions{
|
options := jsonrpc.PublishOptions{
|
||||||
Title: &v.title,
|
Title: &v.title,
|
||||||
Author: &v.channelTitle,
|
Author: &v.channelTitle,
|
||||||
|
@ -195,15 +198,12 @@ func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amoun
|
||||||
Thumbnail: strPtr("https://berk.ninja/thumbnails/" + v.id),
|
Thumbnail: strPtr("https://berk.ninja/thumbnails/" + v.id),
|
||||||
License: strPtr("Copyrighted (contact author)"),
|
License: strPtr("Copyrighted (contact author)"),
|
||||||
ChangeAddress: &claimAddress,
|
ChangeAddress: &claimAddress,
|
||||||
|
ChannelID: &channelID,
|
||||||
}
|
}
|
||||||
if channelName != "" {
|
|
||||||
options.ChannelName = &channelName
|
|
||||||
}
|
|
||||||
|
|
||||||
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options)
|
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string, maxVideoSize int) (*SyncSummary, error) {
|
func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) {
|
||||||
//download and thumbnail can be done in parallel
|
//download and thumbnail can be done in parallel
|
||||||
err := v.download()
|
err := v.download()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -227,7 +227,7 @@ func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount f
|
||||||
}
|
}
|
||||||
log.Debugln("Created thumbnail for " + v.id)
|
log.Debugln("Created thumbnail for " + v.id)
|
||||||
|
|
||||||
summary, err := v.publish(daemon, claimAddress, amount, channelName)
|
summary, err := v.publish(daemon, claimAddress, amount, channelID)
|
||||||
//delete the video in all cases (and ignore the error)
|
//delete the video in all cases (and ignore the error)
|
||||||
_ = v.delete()
|
_ = v.delete()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
109
ytsync/ytsync.go
|
@ -61,16 +61,27 @@ type Sync struct {
|
||||||
Refill int
|
Refill int
|
||||||
Manager *SyncManager
|
Manager *SyncManager
|
||||||
|
|
||||||
daemon *jsonrpc.Client
|
daemon *jsonrpc.Client
|
||||||
if you have multiple mutexes, you can't call one of them just the mutexes should be pointers, so new copies are not created if the Sync struct is copied its clearer if you put the mutex right above the thing the mutex is locking, and leave newlines on either side. so in this example, upt
if you have multiple mutexes, you can't call one of them just `mux`. its not clear what that's for. it could be something like `walletSetupMux`. though once you name it that, it becomes clear that something might be wrong with wrapping the whole wallet setup in a single mutex. does the whole thing *actually* need to be locked?
the mutexes should be pointers, so new copies are not created if the Sync struct is copied
its clearer if you put the mutex right above the thing the mutex is locking, and leave newlines on either side. so in this example, upt `videosMapMux` right above `syncedVideos`. and then maybe rename it to `syncedVideosMux`, like so:
```
...other vars...
syncedVideosMux *sync.Mutex
syncedVideos map[string]syncedVideo
...other vars...
```
this should also be a pointer. but more importantly, you don't need this if you already have a stop.Group. stop.Group works as a combo WaitGroup + channel that will be closed to indicate stopping (and can be closed safely multiple times). so use this should also be a pointer. but more importantly, you don't need this if you already have a stop.Group. stop.Group works as a combo WaitGroup + channel that will be closed to indicate stopping (and can be closed safely multiple times). so use `grp.Add()` and `grp.Wait()` instead
it won't let me comment below, so I'm commenting here: you call
it won't let me comment below, so I'm commenting here:
you call `Add()` and `Done()` inside `startWorker(workerNum int)`, but the correct pattern is to make those calls outside the function. startWorker() doesn't know if its being run asynchronously or not. you only need a waitgroup if it is. there are also subtle concurrency issues with calling it inside the function. so the right way to go is to remove Add and Done from inside startWorker, and do this:
```
for i := 0; i < s.ConcurrentVideos; i++ {
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.startWorker(i)
}()
}
```
you can use a read/write lock to lock the map for reading when you read it, and writing when you write to it. this lets multiple threads read the data at once, which is safe and blocks less. use you can use a read/write lock to lock the map for reading when you read it, and writing when you write to it. this lets multiple threads read the data at once, which is safe and blocks less. use `*sync.RWMutex` instead, and call `mux.RLock()` and `mux.RUnlock()` when you're only reading from the variable. leave `mux.Lock()` for writing
I need to look into that, this is from your original code and I don't remember ever changing it. Thanks for the pointers there. I need to look into that, this is from your original code and I don't remember ever changing it. Thanks for the pointers there.
Will rename and move the mutexes. Will rename and move the mutexes.
The reason I didn't use read/write locks is that i don't want the application to read when it's being written to, plus the locks are held for a very very short time so there would be no noticeable improvement, only a higher risk of race conditions happening during wallet refills.
I looked more into why we need to lock the whole walletSetup function. The reason I'm doing that is to avoid multiple threads from refilling the wallet concurrently. I don't think I can easily break up the the function to lock fewer lines of code. I think it's fair to leave it like that. I removed the wait group in favor of the stop group I looked more into why we need to lock the whole walletSetup function. The reason I'm doing that is to avoid multiple threads from refilling the wallet concurrently. I don't think I can easily break up the the function to lock fewer lines of code. I think it's fair to leave it like that.
I removed the wait group in favor of the stop group
a read/write lock does not allow reading during writing. thats the point of every lock. what it does allow is multiple concurrent reads. when you do RLock, others can RLock at the same time. when you Lock (for writing), no one's allowed to Lock or RLock at the same time. a read/write lock does not allow reading during writing. thats the point of every lock. what it does allow is multiple concurrent reads. when you do RLock, others can RLock at the same time. when you Lock (for writing), no one's allowed to Lock or RLock at the same time.
Oh yes, you're right, not sure what was going in my mind. I'll swap that Oh yes, you're right, not sure what was going in my mind. I'll swap that
|
|||||||
claimAddress string
|
claimAddress string
|
||||||
videoDirectory string
|
videoDirectory string
|
||||||
db *redisdb.DB
|
db *redisdb.DB
|
||||||
|
syncedVideos map[string]syncedVideo
|
||||||
|
syncedVideosMux *sync.Mutex
|
||||||
|
grp *stop.Group
|
||||||
|
lbryChannelID string
|
||||||
|
|
||||||
grp *stop.Group
|
walletMux *sync.Mutex
|
||||||
|
queue chan video
|
||||||
|
}
|
||||||
|
|
||||||
mux sync.Mutex
|
func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string) {
|
||||||
wg sync.WaitGroup
|
s.syncedVideosMux.Lock()
|
||||||
queue chan video
|
defer s.syncedVideosMux.Unlock()
|
||||||
|
s.syncedVideos[videoID] = syncedVideo{
|
||||||
|
VideoID: videoID,
|
||||||
|
Published: published,
|
||||||
|
FailureReason: failureReason,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendErrorToSlack Sends an error message to the default channel and to the process log.
|
// SendErrorToSlack Sends an error message to the default channel and to the process log.
|
||||||
|
@ -110,10 +121,26 @@ func (s *Sync) FullCycle() (e error) {
|
||||||
if s.YoutubeChannelID == "" {
|
if s.YoutubeChannelID == "" {
|
||||||
return errors.Err("channel ID not provided")
|
return errors.Err("channel ID not provided")
|
||||||
}
|
}
|
||||||
err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSyncing)
|
s.syncedVideosMux = &sync.Mutex{}
|
||||||
|
s.walletMux = &sync.Mutex{}
|
||||||
|
s.db = redisdb.New()
|
||||||
|
s.grp = stop.New()
|
||||||
|
s.queue = make(chan video)
|
||||||
|
interruptChan := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
|
||||||
|
go func() {
|
||||||
|
<-interruptChan
|
||||||
|
log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)")
|
||||||
|
s.grp.Stop()
|
||||||
|
}()
|
||||||
|
syncedVideos, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
s.syncedVideosMux.Lock()
|
||||||
|
s.syncedVideos = syncedVideos
|
||||||
|
s.syncedVideosMux.Unlock()
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if e != nil {
|
if e != nil {
|
||||||
//conditions for which a channel shouldn't be marked as failed
|
//conditions for which a channel shouldn't be marked as failed
|
||||||
|
@ -123,14 +150,14 @@ func (s *Sync) FullCycle() (e error) {
|
||||||
if util.SubstringInSlice(e.Error(), noFailConditions) {
|
if util.SubstringInSlice(e.Error(), noFailConditions) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusFailed)
|
_, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName)
|
msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName)
|
||||||
err = errors.Prefix(msg, err)
|
err = errors.Prefix(msg, err)
|
||||||
e = errors.Prefix(err.Error(), e)
|
e = errors.Prefix(err.Error(), e)
|
||||||
}
|
}
|
||||||
} else if !s.IsInterrupted() {
|
} else if !s.IsInterrupted() {
|
||||||
err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSynced)
|
_, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e = err
|
e = err
|
||||||
}
|
}
|
||||||
|
@ -181,18 +208,6 @@ func (s *Sync) FullCycle() (e error) {
|
||||||
return errors.Wrap(err, 0)
|
return errors.Wrap(err, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
s.db = redisdb.New()
|
|
||||||
s.grp = stop.New()
|
|
||||||
s.queue = make(chan video)
|
|
||||||
|
|
||||||
interruptChan := make(chan os.Signal, 1)
|
|
||||||
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
|
|
||||||
go func() {
|
|
||||||
<-interruptChan
|
|
||||||
log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)")
|
|
||||||
s.grp.Stop()
|
|
||||||
}()
|
|
||||||
|
|
||||||
log.Printf("Starting daemon")
|
log.Printf("Starting daemon")
|
||||||
err = startDaemonViaSystemd()
|
err = startDaemonViaSystemd()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -247,7 +262,11 @@ func (s *Sync) doSync() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < s.ConcurrentVideos; i++ {
|
for i := 0; i < s.ConcurrentVideos; i++ {
|
||||||
go s.startWorker(i)
|
s.grp.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer s.grp.Done()
|
||||||
|
s.startWorker(i)
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.LbryChannelName == "@UCBerkeley" {
|
if s.LbryChannelName == "@UCBerkeley" {
|
||||||
|
@ -256,14 +275,11 @@ func (s *Sync) doSync() error {
|
||||||
err = s.enqueueYoutubeVideos()
|
err = s.enqueueYoutubeVideos()
|
||||||
}
|
}
|
||||||
close(s.queue)
|
close(s.queue)
|
||||||
s.wg.Wait()
|
s.grp.Wait()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Sync) startWorker(workerNum int) {
|
func (s *Sync) startWorker(workerNum int) {
|
||||||
s.wg.Add(1)
|
|
||||||
defer s.wg.Done()
|
|
||||||
|
|
||||||
var v video
|
var v video
|
||||||
var more bool
|
var more bool
|
||||||
|
|
||||||
|
@ -300,6 +316,7 @@ func (s *Sync) startWorker(workerNum int) {
|
||||||
"no space left on device",
|
"no space left on device",
|
||||||
"NotEnoughFunds",
|
"NotEnoughFunds",
|
||||||
"Cannot publish using channel",
|
"Cannot publish using channel",
|
||||||
|
"more than 90% of the space has been used.",
|
||||||
}
|
}
|
||||||
if util.SubstringInSlice(err.Error(), fatalErrors) || s.StopOnError {
|
if util.SubstringInSlice(err.Error(), fatalErrors) || s.StopOnError {
|
||||||
s.grp.Stop()
|
s.grp.Stop()
|
||||||
|
@ -336,6 +353,7 @@ func (s *Sync) startWorker(workerNum int) {
|
||||||
}
|
}
|
||||||
SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg)
|
SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg)
|
||||||
}
|
}
|
||||||
|
s.AppendSyncedVideo(v.ID(), false, err.Error())
|
||||||
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error())
|
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
|
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
|
||||||
|
@ -493,10 +511,32 @@ func (s *Sync) processVideo(v video) (err error) {
|
||||||
log.Println(v.ID() + " took " + time.Since(start).String())
|
log.Println(v.ID() + " took " + time.Since(start).String())
|
||||||
}(time.Now())
|
}(time.Now())
|
||||||
|
|
||||||
alreadyPublished, err := s.db.IsPublished(v.ID())
|
s.syncedVideosMux.Lock()
|
||||||
|
sv, ok := s.syncedVideos[v.ID()]
|
||||||
|
s.syncedVideosMux.Unlock()
|
||||||
|
alreadyPublished := ok && sv.Published
|
||||||
|
|
||||||
|
neverRetryFailures := []string{
|
||||||
|
"Error extracting sts from embedded url response",
|
||||||
|
"the video is too big to sync, skipping for now",
|
||||||
|
}
|
||||||
|
if ok && !sv.Published && util.SubstringInSlice(sv.FailureReason, neverRetryFailures) {
|
||||||
|
log.Println(v.ID() + " can't ever be published")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//TODO: remove this after a few runs...
|
||||||
|
alreadyPublishedOld, err := s.db.IsPublished(v.ID())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
//TODO: remove this after a few runs...
|
||||||
|
if alreadyPublishedOld && !alreadyPublished {
|
||||||
|
//seems like something in the migration of blobs didn't go perfectly right so warn about it!
|
||||||
|
SendInfoToSlack("A video that was previously published is on the local database but isn't on the remote db! fix it @Nikooo777! \nchannelID: %s, videoID: %s",
|
||||||
|
s.YoutubeChannelID, v.ID())
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
if alreadyPublished {
|
if alreadyPublished {
|
||||||
log.Println(v.ID() + " already published")
|
log.Println(v.ID() + " already published")
|
||||||
|
@ -507,18 +547,19 @@ func (s *Sync) processVideo(v video) (err error) {
|
||||||
log.Println(v.ID() + " is old: skipping")
|
log.Println(v.ID() + " is old: skipping")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.LbryChannelName, s.Manager.MaxVideoSize)
|
err = s.Manager.checkUsedSpace()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.MaxVideoSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "")
|
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "")
|
||||||
if err != nil {
|
|
||||||
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
|
|
||||||
}
|
|
||||||
err = s.db.SetPublished(v.ID())
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
s.AppendSyncedVideo(v.ID(), true, "")
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
you don't have to create a named struct for this if its only used in one place. you can inline it as