remove redisDB dependency #25

Merged
nikooo777 merged 12 commits from use_video_statuses into master 2018-08-20 13:56:26 +02:00
5 changed files with 50 additions and 61 deletions
Showing only changes of commit 3a51ea9b76 - Show all commits

View file

@ -122,9 +122,6 @@ func (s SyncManager) setChannelStatus(channelID string, status string) (map[stri
return nil, errors.Err(response.Error.String) return nil, errors.Err(response.Error.String)
} }
if response.Data != nil { if response.Data != nil {
if len(response.Data) == 0 {
return nil, nil
}
svs := make(map[string]syncedVideo) svs := make(map[string]syncedVideo)
for _, v := range response.Data { for _, v := range response.Data {
svs[v.VideoID] = v svs[v.VideoID] = v
@ -256,7 +253,7 @@ func (s SyncManager) Start() error {
time.Sleep(5 * time.Minute) time.Sleep(5 * time.Minute)
} }
for i, sync := range syncs { for i, sync := range syncs {
SendInfoToSlack("Syncing %s to LBRY! (iteration %d/%d - total session iterations: %d)", sync.LbryChannelName, i+1, len(syncs), syncCount) SendInfoToSlack("Syncing %s (%s) to LBRY! (iteration %d/%d - total session iterations: %d)", sync.LbryChannelName, sync.YoutubeChannelID, i+1, len(syncs), syncCount)
err := sync.FullCycle() err := sync.FullCycle()
if err != nil { if err != nil {
fatalErrors := []string{ fatalErrors := []string{
@ -270,7 +267,7 @@ func (s SyncManager) Start() error {
} }
SendInfoToSlack("A non fatal error was reported by the sync process. %s\nContinuing...", err.Error()) SendInfoToSlack("A non fatal error was reported by the sync process. %s\nContinuing...", err.Error())
} }
SendInfoToSlack("Syncing %s reached an end. (Iteration %d/%d - total session iterations: %d))", sync.LbryChannelName, i+1, len(syncs), syncCount) SendInfoToSlack("Syncing %s (%s) reached an end. (Iteration %d/%d - total session iterations: %d))", sync.LbryChannelName, sync.YoutubeChannelID, i+1, len(syncs), syncCount)
syncCount++ syncCount++
if sync.IsInterrupted() || (s.Limit != 0 && syncCount >= s.Limit) { if sync.IsInterrupted() || (s.Limit != 0 && syncCount >= s.Limit) {
shouldInterruptLoop = true shouldInterruptLoop = true

View file

@ -99,7 +99,6 @@ func (s *Sync) walletSetup() error {
} }
func (s *Sync) ensureEnoughUTXOs() error { func (s *Sync) ensureEnoughUTXOs() error {
utxolist, err := s.daemon.UTXOList() utxolist, err := s.daemon.UTXOList()
if err != nil { if err != nil {
return err return err
@ -137,18 +136,13 @@ func (s *Sync) ensureEnoughUTXOs() error {
return errors.Err("no response") return errors.Err("no response")
} }
wait := 15 * time.Second err = s.waitForNewBlock()
log.Println("Waiting " + wait.String() + " for lbryum to let us know we have the new addresses")
time.Sleep(wait)
log.Println("Creating UTXOs and waiting for them to be confirmed")
err = s.waitUntilUTXOsConfirmed()
if err != nil { if err != nil {
return err return err
} }
} else if !allUTXOsConfirmed(utxolist) { } else if !allUTXOsConfirmed(utxolist) {
log.Println("Waiting for previous txns to confirm") log.Println("Waiting for previous txns to confirm")
err := s.waitUntilUTXOsConfirmed() err := s.waitForNewBlock()
if err != nil { if err != nil {
return err return err
} }
@ -157,28 +151,31 @@ func (s *Sync) ensureEnoughUTXOs() error {
return nil return nil
} }
func (s *Sync) waitUntilUTXOsConfirmed() error { func (s *Sync) waitForNewBlock() error {
origin := time.Now() status, err := s.daemon.Status()
for { if err != nil {
r, err := s.daemon.UTXOList() return err
}
for status.BlockchainStatus.Blocks == 0 || status.BlockchainStatus.BlocksBehind != 0 {
time.Sleep(5 * time.Second)
status, err = s.daemon.Status()
if err != nil { if err != nil {
return err return err
} else if r == nil {
return errors.Err("no response")
} }
if allUTXOsConfirmed(r) {
return nil
}
if time.Now().After(origin.Add(15 * time.Minute)) {
//lbryum is messing with us or something. restart the daemon
//this could also be a very long block
SendErrorToSlack("We've been waiting UTXOs confirmation for %s... and this isn't normal", time.Now().Sub(origin).String())
}
wait := 30 * time.Second
log.Println("Waiting " + wait.String() + "...")
time.Sleep(wait)
} }
currentBlock := status.BlockchainStatus.Blocks
for i := 0; status.BlockchainStatus.Blocks <= currentBlock; i++ {
if i%3 == 0 {
log.Printf("Waiting for new block (%d)...", currentBlock+1)
}
time.Sleep(10 * time.Second)
status, err = s.daemon.Status()
if err != nil {
return err
}
}
return nil
} }
func (s *Sync) ensureChannelOwnership() error { func (s *Sync) ensureChannelOwnership() error {
@ -196,6 +193,7 @@ func (s *Sync) ensureChannelOwnership() error {
isChannelMine := false isChannelMine := false
for _, channel := range *channels { for _, channel := range *channels {
if channel.Name == s.LbryChannelName { if channel.Name == s.LbryChannelName {
s.lbryChannelID = channel.ClaimID
isChannelMine = true isChannelMine = true
} else { } else {
return errors.Err("this wallet has multiple channels. maybe something went wrong during setup?") return errors.Err("this wallet has multiple channels. maybe something went wrong during setup?")
@ -234,16 +232,11 @@ func (s *Sync) ensureChannelOwnership() error {
s.addCredits(channelBidAmount + 0.1) s.addCredits(channelBidAmount + 0.1)
} }
_, err = s.daemon.ChannelNew(s.LbryChannelName, channelBidAmount) c, err := s.daemon.ChannelNew(s.LbryChannelName, channelBidAmount)
if err != nil { if err != nil {
return err return err
} }
s.lbryChannelID = c.ClaimID
// niko's code says "unfortunately the queues in the daemon are not yet merged so we must give it some time for the channel to go through"
wait := 15 * time.Second
log.Println("Waiting " + wait.String() + " for channel claim to go through")
time.Sleep(wait)
return nil return nil
} }

View file

@ -170,25 +170,23 @@ func (v ucbVideo) saveThumbnail() error {
return err return err
} }
func (v ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) (*SyncSummary, error) { func (v ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) {
options := jsonrpc.PublishOptions{ options := jsonrpc.PublishOptions{
Title: &v.title, Title: &v.title,
Author: strPtr("UC Berkeley"), Author: strPtr("UC Berkeley"),
Description: strPtr(v.getAbbrevDescription()), Description: strPtr(v.getAbbrevDescription()),
Language: strPtr("en"), Language: strPtr("en"),
ClaimAddress: &claimAddress, ClaimAddress: &claimAddress,
Thumbnail: strPtr("https://berk.ninja/thumbnails/" + v.id), Thumbnail: strPtr("https://berk.ninja/thumbnails/" + v.id),
License: strPtr("see description"), License: strPtr("see description"),
} ChannelID: &channelID,
ChangeAddress: &claimAddress,
if channelName != "" {
options.ChannelName = &channelName
} }
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options) return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options)
} }
tiger5226 commented 2018-08-17 01:13:22 +02:00 (Migrated from github.com)
Review

Why is the author UC Berkley? should this function be called publishUCBerkley?

Why is the author UC Berkley? should this function be called publishUCBerkley?
tiger5226 commented 2018-08-17 01:14:27 +02:00 (Migrated from github.com)
Review

ahhh, ucbVideo Seems odd to have a struct type for a specific author.

ahhh, `ucbVideo` Seems odd to have a struct type for a specific author.
func (v ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string, maxVideoSize int) (*SyncSummary, error) { func (v ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) {
//download and thumbnail can be done in parallel //download and thumbnail can be done in parallel
err := v.download() err := v.download()
if err != nil { if err != nil {
@ -202,7 +200,7 @@ func (v ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float
//} //}
//log.Debugln("Created thumbnail for " + v.id) //log.Debugln("Created thumbnail for " + v.id)
summary, err := v.publish(daemon, claimAddress, amount, channelName) summary, err := v.publish(daemon, claimAddress, amount, channelID)
if err != nil { if err != nil {
return nil, errors.Prefix("publish error", err) return nil, errors.Prefix("publish error", err)
} }

View file

@ -185,7 +185,10 @@ func (v YoutubeVideo) triggerThumbnailSave() error {
func strPtr(s string) *string { return &s } func strPtr(s string) *string { return &s }
func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) (*SyncSummary, error) { func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) {
if channelID == "" {
return nil, errors.Err("a claim_id for the channel wasn't provided") //TODO: this is probably not needed?
}
options := jsonrpc.PublishOptions{ options := jsonrpc.PublishOptions{
Title: &v.title, Title: &v.title,
Author: &v.channelTitle, Author: &v.channelTitle,
@ -195,15 +198,12 @@ func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amoun
Thumbnail: strPtr("https://berk.ninja/thumbnails/" + v.id), Thumbnail: strPtr("https://berk.ninja/thumbnails/" + v.id),
License: strPtr("Copyrighted (contact author)"), License: strPtr("Copyrighted (contact author)"),
ChangeAddress: &claimAddress, ChangeAddress: &claimAddress,
ChannelID: &channelID,
} }
if channelName != "" {
options.ChannelName = &channelName
}
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options) return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options)
} }
func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string, maxVideoSize int) (*SyncSummary, error) { func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) {
//download and thumbnail can be done in parallel //download and thumbnail can be done in parallel
err := v.download() err := v.download()
if err != nil { if err != nil {
@ -227,7 +227,7 @@ func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount f
} }
log.Debugln("Created thumbnail for " + v.id) log.Debugln("Created thumbnail for " + v.id)
summary, err := v.publish(daemon, claimAddress, amount, channelName) summary, err := v.publish(daemon, claimAddress, amount, channelID)
//delete the video in all cases (and ignore the error) //delete the video in all cases (and ignore the error)
_ = v.delete() _ = v.delete()
if err != nil { if err != nil {

View file

@ -67,6 +67,7 @@ type Sync struct {
db *redisdb.DB db *redisdb.DB
syncedVideos map[string]syncedVideo syncedVideos map[string]syncedVideo
grp *stop.Group grp *stop.Group
lbryChannelID string
videosMapMux sync.Mutex videosMapMux sync.Mutex
mux sync.Mutex mux sync.Mutex
@ -352,11 +353,11 @@ func (s *Sync) startWorker(workerNum int) {
} }
SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg) SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg)
} }
s.AppendSyncedVideo(v.ID(), false, err.Error())
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error()) err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error())
if err != nil { if err != nil {
SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
} }
s.AppendSyncedVideo(v.ID(), false, err.Error())
} }
break break
} }
@ -546,7 +547,7 @@ func (s *Sync) processVideo(v video) (err error) {
log.Println(v.ID() + " is old: skipping") log.Println(v.ID() + " is old: skipping")
return nil return nil
} }
summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.LbryChannelName, s.Manager.MaxVideoSize) summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.MaxVideoSize)
if err != nil { if err != nil {
return err return err
} }