recover ytsync panics when processing videos, switch to our errors everywhere

This commit is contained in:
Alex Grintsvayg 2018-03-09 11:47:38 -05:00
parent 393e2c4e80
commit daffded4f0
6 changed files with 58 additions and 45 deletions

View file

@ -3,7 +3,8 @@ package ytsync
import ( import (
"net/http" "net/http"
"github.com/go-errors/errors" "github.com/lbryio/lbry.go/errors"
"google.golang.org/api/googleapi/transport" "google.golang.org/api/googleapi/transport"
"google.golang.org/api/youtube/v3" "google.golang.org/api/youtube/v3"
) )
@ -15,16 +16,16 @@ func (s *Sync) CountVideos() (uint64, error) {
service, err := youtube.New(client) service, err := youtube.New(client)
if err != nil { if err != nil {
return 0, errors.WrapPrefix(err, "error creating YouTube service", 0) return 0, errors.Prefix("error creating YouTube service", err)
} }
response, err := service.Channels.List("statistics").Id(s.YoutubeChannelID).Do() response, err := service.Channels.List("statistics").Id(s.YoutubeChannelID).Do()
if err != nil { if err != nil {
return 0, errors.WrapPrefix(err, "error getting channels", 0) return 0, errors.Prefix("error getting channels", err)
} }
if len(response.Items) < 1 { if len(response.Items) < 1 {
return 0, errors.New("youtube channel not found") return 0, errors.Err("youtube channel not found")
} }
return response.Items[0].Statistics.VideoCount, nil return response.Items[0].Statistics.VideoCount, nil

View file

@ -3,8 +3,9 @@ package redisdb
import ( import (
"time" "time"
"github.com/lbryio/lbry.go/errors"
"github.com/garyburd/redigo/redis" "github.com/garyburd/redigo/redis"
"github.com/go-errors/errors"
) )
const ( const (
@ -39,7 +40,7 @@ func (r DB) IsPublished(id string) (bool, error) {
alreadyPublished, err := redis.String(conn.Do("HGET", redisHashKey, id)) alreadyPublished, err := redis.String(conn.Do("HGET", redisHashKey, id))
if err != nil && err != redis.ErrNil { if err != nil && err != redis.ErrNil {
return false, errors.WrapPrefix(err, "redis error", 0) return false, errors.Prefix("redis error", err)
} }
@ -56,7 +57,7 @@ func (r DB) SetPublished(id string) error {
_, err := redis.Bool(conn.Do("HSET", redisHashKey, id, redisSyncedVal)) _, err := redis.Bool(conn.Do("HSET", redisHashKey, id, redisSyncedVal))
if err != nil { if err != nil {
return errors.New("redis error: " + err.Error()) return errors.Prefix("redis error", err)
} }
return nil return nil
} }

View file

@ -4,10 +4,10 @@ import (
"strings" "strings"
"time" "time"
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/jsonrpc" "github.com/lbryio/lbry.go/jsonrpc"
"github.com/lbryio/lbry.go/lbrycrd" "github.com/lbryio/lbry.go/lbrycrd"
"github.com/go-errors/errors"
"github.com/shopspring/decimal" "github.com/shopspring/decimal"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -22,7 +22,7 @@ func (s *Sync) walletSetup() error {
if err != nil { if err != nil {
return err return err
} else if balanceResp == nil { } else if balanceResp == nil {
return errors.New("no response") return errors.Err("no response")
} }
balance := decimal.Decimal(*balanceResp) balance := decimal.Decimal(*balanceResp)
log.Debugf("Starting balance is %s", balance.String()) log.Debugf("Starting balance is %s", balance.String())
@ -59,11 +59,11 @@ func (s *Sync) walletSetup() error {
if err != nil { if err != nil {
return err return err
} else if claimAddress == nil { } else if claimAddress == nil {
return errors.New("could not get unused address") return errors.Err("could not get unused address")
} }
s.claimAddress = string(*claimAddress) s.claimAddress = string(*claimAddress)
if s.claimAddress == "" { if s.claimAddress == "" {
return errors.New("found blank claim address") return errors.Err("found blank claim address")
} }
err = s.ensureEnoughUTXOs() err = s.ensureEnoughUTXOs()
@ -79,7 +79,7 @@ func (s *Sync) ensureEnoughUTXOs() error {
if err != nil { if err != nil {
return err return err
} else if utxolist == nil { } else if utxolist == nil {
return errors.New("no response") return errors.Err("no response")
} }
if !allUTXOsConfirmed(utxolist) { if !allUTXOsConfirmed(utxolist) {
@ -103,7 +103,7 @@ func (s *Sync) ensureEnoughUTXOs() error {
if err != nil { if err != nil {
return err return err
} else if balance == nil { } else if balance == nil {
return errors.New("no response") return errors.Err("no response")
} }
log.Println("balance is " + decimal.Decimal(*balance).String()) log.Println("balance is " + decimal.Decimal(*balance).String())
@ -114,9 +114,9 @@ func (s *Sync) ensureEnoughUTXOs() error {
if err != nil { if err != nil {
return err return err
} else if prefillTx == nil { } else if prefillTx == nil {
return errors.New("no response") return errors.Err("no response")
} else if !prefillTx.Complete || !prefillTx.Broadcast { } else if !prefillTx.Complete || !prefillTx.Broadcast {
return errors.New("failed to prefill addresses") return errors.Err("failed to prefill addresses")
} }
wait := 15 * time.Second wait := 15 * time.Second
@ -139,7 +139,7 @@ func (s *Sync) waitUntilUTXOsConfirmed() error {
if err != nil { if err != nil {
return err return err
} else if r == nil { } else if r == nil {
return errors.New("no response") return errors.Err("no response")
} }
if allUTXOsConfirmed(r) { if allUTXOsConfirmed(r) {
@ -154,14 +154,14 @@ func (s *Sync) waitUntilUTXOsConfirmed() error {
func (s *Sync) ensureChannelOwnership() error { func (s *Sync) ensureChannelOwnership() error {
if s.LbryChannelName == "" { if s.LbryChannelName == "" {
return errors.New("no channel name set") return errors.Err("no channel name set")
} }
channels, err := s.daemon.ChannelListMine() channels, err := s.daemon.ChannelListMine()
if err != nil { if err != nil {
return err return err
} else if channels == nil { } else if channels == nil {
return errors.New("no channel response") return errors.Err("no channel response")
} }
isChannelMine := false isChannelMine := false
@ -169,7 +169,7 @@ func (s *Sync) ensureChannelOwnership() error {
if channel.Name == s.LbryChannelName { if channel.Name == s.LbryChannelName {
isChannelMine = true isChannelMine = true
} else { } else {
return errors.New("this wallet has multiple channels. maybe something went wrong during setup?") return errors.Err("this wallet has multiple channels. maybe something went wrong during setup?")
} }
} }
if isChannelMine { if isChannelMine {
@ -187,7 +187,7 @@ func (s *Sync) ensureChannelOwnership() error {
channelNotFound := channel.Error != nil && strings.Contains(*(channel.Error), "cannot be resolved") channelNotFound := channel.Error != nil && strings.Contains(*(channel.Error), "cannot be resolved")
if !channelNotFound { if !channelNotFound {
if !s.TakeOverExistingChannel { if !s.TakeOverExistingChannel {
return errors.New("Channel exists and we don't own it. Pick another channel.") return errors.Err("Channel exists and we don't own it. Pick another channel.")
} }
log.Println("Channel exists and we don't own it. Outbidding existing claim.") log.Println("Channel exists and we don't own it. Outbidding existing claim.")
channelBidAmount, _ = channel.Certificate.Amount.Add(decimal.NewFromFloat(channelClaimAmount)).Float64() channelBidAmount, _ = channel.Certificate.Amount.Add(decimal.NewFromFloat(channelClaimAmount)).Float64()
@ -197,7 +197,7 @@ func (s *Sync) ensureChannelOwnership() error {
if err != nil { if err != nil {
return err return err
} else if balanceResp == nil { } else if balanceResp == nil {
return errors.New("no response") return errors.Err("no response")
} }
balance := decimal.Decimal(*balanceResp) balance := decimal.Decimal(*balanceResp)
@ -247,7 +247,7 @@ func (s *Sync) addCredits(amountToAdd float64) error {
if err != nil { if err != nil {
return err return err
} else if addressResp == nil { } else if addressResp == nil {
return errors.New("no response") return errors.Err("no response")
} }
address := string(*addressResp) address := string(*addressResp)

View file

@ -11,11 +11,11 @@ import (
"github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/jsonrpc" "github.com/lbryio/lbry.go/jsonrpc"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/aws/session"
"github.com/go-errors/errors"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -136,7 +136,7 @@ func (v ucbVideo) download() error {
if err != nil { if err != nil {
return err return err
} else if bytesWritten == 0 { } else if bytesWritten == 0 {
return errors.New("zero bytes written") return errors.Err("zero bytes written")
} }
return nil return nil
@ -188,7 +188,7 @@ func (v ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float
//download and thumbnail can be done in parallel //download and thumbnail can be done in parallel
err := v.download() err := v.download()
if err != nil { if err != nil {
return errors.WrapPrefix(err, "download error", 0) return errors.Prefix("download error", err)
} }
log.Debugln("Downloaded " + v.id) log.Debugln("Downloaded " + v.id)
@ -200,7 +200,7 @@ func (v ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float
err = v.publish(daemon, claimAddress, amount, channelName) err = v.publish(daemon, claimAddress, amount, channelName)
if err != nil { if err != nil {
return errors.WrapPrefix(err, "publish error", 0) return errors.Prefix("publish error", err)
} }
return nil return nil

View file

@ -11,9 +11,9 @@ import (
"strings" "strings"
"time" "time"
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/jsonrpc" "github.com/lbryio/lbry.go/jsonrpc"
"github.com/go-errors/errors"
ytdl "github.com/kkdai/youtube" ytdl "github.com/kkdai/youtube"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"google.golang.org/api/youtube/v3" "google.golang.org/api/youtube/v3"
@ -150,7 +150,7 @@ func (v YoutubeVideo) triggerThumbnailSave() error {
} }
if decoded.error != 0 { if decoded.error != 0 {
return errors.New("error creating thumbnail: " + decoded.message) return errors.Err("error creating thumbnail: " + decoded.message)
} }
return nil return nil
@ -179,19 +179,19 @@ func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount f
//download and thumbnail can be done in parallel //download and thumbnail can be done in parallel
err := v.download() err := v.download()
if err != nil { if err != nil {
return errors.WrapPrefix(err, "download error", 0) return errors.Prefix("download error", err)
} }
log.Debugln("Downloaded " + v.id) log.Debugln("Downloaded " + v.id)
err = v.triggerThumbnailSave() err = v.triggerThumbnailSave()
if err != nil { if err != nil {
return errors.WrapPrefix(err, "thumbnail error", 0) return errors.Prefix("thumbnail error", err)
} }
log.Debugln("Created thumbnail for " + v.id) log.Debugln("Created thumbnail for " + v.id)
err = v.publish(daemon, claimAddress, amount, channelName) err = v.publish(daemon, claimAddress, amount, channelName)
if err != nil { if err != nil {
return errors.WrapPrefix(err, "publish error", 0) return errors.Prefix("publish error", err)
} }
return nil return nil

View file

@ -16,12 +16,12 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/jsonrpc" "github.com/lbryio/lbry.go/jsonrpc"
"github.com/lbryio/lbry.go/stopOnce" "github.com/lbryio/lbry.go/stopOnce"
"github.com/lbryio/lbry.go/ytsync/redisdb" "github.com/lbryio/lbry.go/ytsync/redisdb"
"github.com/lbryio/lbry.go/ytsync/sources" "github.com/lbryio/lbry.go/ytsync/sources"
"github.com/go-errors/errors"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"google.golang.org/api/googleapi/transport" "google.golang.org/api/googleapi/transport"
"google.golang.org/api/youtube/v3" "google.golang.org/api/youtube/v3"
@ -70,7 +70,7 @@ type Sync struct {
func (s *Sync) FullCycle() error { func (s *Sync) FullCycle() error {
var err error var err error
if os.Getenv("HOME") == "" { if os.Getenv("HOME") == "" {
return errors.New("no $HOME env var found") return errors.Err("no $HOME env var found")
} }
if s.YoutubeChannelID == "" { if s.YoutubeChannelID == "" {
@ -85,7 +85,7 @@ func (s *Sync) FullCycle() error {
walletBackupDir := os.Getenv("HOME") + "/wallets/" + strings.Replace(s.LbryChannelName, "@", "", 1) walletBackupDir := os.Getenv("HOME") + "/wallets/" + strings.Replace(s.LbryChannelName, "@", "", 1)
if _, err := os.Stat(defaultWalletDir); !os.IsNotExist(err) { if _, err := os.Stat(defaultWalletDir); !os.IsNotExist(err) {
return errors.New("default_wallet already exists") return errors.Err("default_wallet already exists")
} }
if _, err = os.Stat(walletBackupDir); !os.IsNotExist(err) { if _, err = os.Stat(walletBackupDir); !os.IsNotExist(err) {
@ -255,25 +255,25 @@ func (s *Sync) enqueueYoutubeVideos() error {
service, err := youtube.New(client) service, err := youtube.New(client)
if err != nil { if err != nil {
return errors.WrapPrefix(err, "error creating YouTube service", 0) return errors.Prefix("error creating YouTube service", err)
} }
response, err := service.Channels.List("contentDetails").Id(s.YoutubeChannelID).Do() response, err := service.Channels.List("contentDetails").Id(s.YoutubeChannelID).Do()
if err != nil { if err != nil {
return errors.WrapPrefix(err, "error getting channels", 0) return errors.Prefix("error getting channels", err)
} }
if len(response.Items) < 1 { if len(response.Items) < 1 {
return errors.New("youtube channel not found") return errors.Err("youtube channel not found")
} }
if response.Items[0].ContentDetails.RelatedPlaylists == nil { if response.Items[0].ContentDetails.RelatedPlaylists == nil {
return errors.New("no related playlists") return errors.Err("no related playlists")
} }
playlistID := response.Items[0].ContentDetails.RelatedPlaylists.Uploads playlistID := response.Items[0].ContentDetails.RelatedPlaylists.Uploads
if playlistID == "" { if playlistID == "" {
return errors.New("no channel playlist") return errors.Err("no channel playlist")
} }
var videos []video var videos []video
@ -287,11 +287,11 @@ func (s *Sync) enqueueYoutubeVideos() error {
playlistResponse, err := req.Do() playlistResponse, err := req.Do()
if err != nil { if err != nil {
return errors.WrapPrefix(err, "error getting playlist items", 0) return errors.Prefix("error getting playlist items", err)
} }
if len(playlistResponse.Items) < 1 { if len(playlistResponse.Items) < 1 {
return errors.New("playlist items not found") return errors.Err("playlist items not found")
} }
for _, item := range playlistResponse.Items { for _, item := range playlistResponse.Items {
@ -380,7 +380,18 @@ Enqueue:
return nil return nil
} }
func (s *Sync) processVideo(v video) error { func (s *Sync) processVideo(v video) (err error) {
defer func() {
if p := recover(); p != nil {
var ok bool
err, ok = p.(error)
if !ok {
err = errors.Err("%v", p)
}
err = errors.Wrap(p, 2)
}
}()
log.Println("Processing " + v.IDAndNum()) log.Println("Processing " + v.IDAndNum())
defer func(start time.Time) { defer func(start time.Time) {
log.Println(v.ID() + " took " + time.Since(start).String()) log.Println(v.ID() + " took " + time.Since(start).String())
@ -412,7 +423,7 @@ func (s *Sync) processVideo(v video) error {
func startDaemonViaSystemd() error { func startDaemonViaSystemd() error {
err := exec.Command("/usr/bin/sudo", "/bin/systemctl", "start", "lbrynet.service").Run() err := exec.Command("/usr/bin/sudo", "/bin/systemctl", "start", "lbrynet.service").Run()
if err != nil { if err != nil {
return errors.New(err) return errors.Err(err)
} }
return nil return nil
} }
@ -420,7 +431,7 @@ func startDaemonViaSystemd() error {
func stopDaemonViaSystemd() error { func stopDaemonViaSystemd() error {
err := exec.Command("/usr/bin/sudo", "/bin/systemctl", "stop", "lbrynet.service").Run() err := exec.Command("/usr/bin/sudo", "/bin/systemctl", "stop", "lbrynet.service").Run()
if err != nil { if err != nil {
return errors.New(err) return errors.Err(err)
} }
return nil return nil
} }
@ -439,7 +450,7 @@ func getChannelIDFromFile(channelName string) (string, error) {
channelID, ok := channels[channelName] channelID, ok := channels[channelName]
if !ok { if !ok {
return "", errors.New("channel not in list") return "", errors.Err("channel not in list")
} }
return channelID, nil return channelID, nil