add concurrency support

fix error handling (FINALLY)
fix slack message formatting
add locking for wallet setup to support concurrency
remove UTXO bug workaround (lbryumx should fix it)
add limits to filesize
This commit is contained in:
Niko Storni 2018-06-06 17:47:28 -04:00
parent ba5c1c6519
commit decb57bbed
3 changed files with 21 additions and 33 deletions

View file

@ -14,6 +14,9 @@ import (
) )
func (s *Sync) walletSetup() error { func (s *Sync) walletSetup() error {
//prevent unnecessary concurrent execution
s.mux.Lock()
defer s.mux.Unlock()
err := s.ensureChannelOwnership() err := s.ensureChannelOwnership()
if err != nil { if err != nil {
return err return err
@ -87,6 +90,7 @@ func (s *Sync) walletSetup() error {
} }
func (s *Sync) ensureEnoughUTXOs() error { func (s *Sync) ensureEnoughUTXOs() error {
utxolist, err := s.daemon.UTXOList() utxolist, err := s.daemon.UTXOList()
if err != nil { if err != nil {
return err return err
@ -162,32 +166,7 @@ func (s *Sync) waitUntilUTXOsConfirmed() error {
if time.Now().After(origin.Add(15 * time.Minute)) { if time.Now().After(origin.Add(15 * time.Minute)) {
//lbryum is messing with us or something. restart the daemon //lbryum is messing with us or something. restart the daemon
//this could also be a very long block //this could also be a very long block
err := stopDaemonViaSystemd() util.SendToSlackError("We've been waiting UTXOs confirmation for %s... and this isn't normal", time.Now().Sub(origin).String())
if err != nil {
logShutdownError(err)
return err
}
var waitTimeout time.Duration = 60 * 8
err = waitForDaemonProcess(waitTimeout)
if err != nil {
logShutdownError(err)
return err
}
err = startDaemonViaSystemd()
if err != nil {
return err
}
log.Infoln("Waiting for daemon to finish starting...")
s.daemon = jsonrpc.NewClient("")
s.daemon.SetRPCTimeout(5 * time.Minute)
for {
_, err := s.daemon.WalletBalance()
if err == nil {
break
}
time.Sleep(5 * time.Second)
}
} }
wait := 30 * time.Second wait := 30 * time.Second
log.Println("Waiting " + wait.String() + "...") log.Println("Waiting " + wait.String() + "...")

View file

@ -126,6 +126,7 @@ func (v YoutubeVideo) delete() error {
videoPath := v.getFilename() videoPath := v.getFilename()
err := os.Remove(videoPath) err := os.Remove(videoPath)
if err != nil { if err != nil {
log.Debugln(errors.Prefix("delete error", err))
return err return err
} }
log.Debugln(v.id + " deleted from disk (" + videoPath + ")") log.Debugln(v.id + " deleted from disk (" + videoPath + ")")
@ -201,6 +202,16 @@ func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount f
} }
log.Debugln("Downloaded " + v.id) log.Debugln("Downloaded " + v.id)
fi, err := os.Stat(v.getFilename())
if err != nil {
return err
}
if fi.Size() > 1*1024*1024*1024 {
//delete the video and ignore the error
_ = v.delete()
return errors.Err("video is bigger than 1GB, skipping for now")
}
err = v.triggerThumbnailSave() err = v.triggerThumbnailSave()
if err != nil { if err != nil {
return errors.Prefix("thumbnail error", err) return errors.Prefix("thumbnail error", err)
@ -208,13 +219,8 @@ func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount f
log.Debugln("Created thumbnail for " + v.id) log.Debugln("Created thumbnail for " + v.id)
err = v.publish(daemon, claimAddress, amount, channelName) err = v.publish(daemon, claimAddress, amount, channelName)
//delete the video in all cases //delete the video in all cases (and ignore the error)
dErr := v.delete() _ = v.delete()
if dErr != nil {
// the video was published anyway so it should be marked as published
// for that to happen, no errors should be returned any further than here
log.Debugln(errors.Prefix("delete error", dErr))
}
if err != nil { if err != nil {
return errors.Prefix("publish error", err) return errors.Prefix("publish error", err)
} }

View file

@ -67,6 +67,7 @@ type Sync struct {
grp *stop.Group grp *stop.Group
mux sync.Mutex
wg sync.WaitGroup wg sync.WaitGroup
queue chan video queue chan video
} }
@ -255,6 +256,7 @@ func (s *Sync) startWorker(workerNum int) {
fatalErrors := []string{ fatalErrors := []string{
":5279: read: connection reset by peer", ":5279: read: connection reset by peer",
"net/http: request canceled (Client.Timeout exceeded while awaiting headers)", "net/http: request canceled (Client.Timeout exceeded while awaiting headers)",
"no space left on device",
} }
if util.InSliceContains(err.Error(), fatalErrors) || s.StopOnError { if util.InSliceContains(err.Error(), fatalErrors) || s.StopOnError {
s.grp.Stop() s.grp.Stop()
@ -268,6 +270,7 @@ func (s *Sync) startWorker(workerNum int) {
"Playback on other websites has been disabled by the video owner", "Playback on other websites has been disabled by the video owner",
"Error in daemon: Cannot publish empty file", "Error in daemon: Cannot publish empty file",
"Error extracting sts from embedded url response", "Error extracting sts from embedded url response",
"Client.Timeout exceeded while awaiting headers)",
} }
if util.InSliceContains(err.Error(), errorsNoRetry) { if util.InSliceContains(err.Error(), errorsNoRetry) {
log.Println("This error should not be retried at all") log.Println("This error should not be retried at all")