diff --git a/manager.go b/manager.go index 825ddb5..bc38810 100644 --- a/manager.go +++ b/manager.go @@ -40,6 +40,10 @@ type SyncManager struct { VideosLimit int MaxVideoSize int LbrycrdString string + AwsS3ID string + AwsS3Secret string + AwsS3Region string + AwsS3Bucket string } const ( @@ -214,6 +218,10 @@ func (s SyncManager) Start() error { Refill: s.Refill, Manager: &s, LbrycrdString: s.LbrycrdString, + AwsS3ID: s.AwsS3ID, + AwsS3Secret: s.AwsS3Secret, + AwsS3Region: s.AwsS3Region, + AwsS3Bucket: s.AwsS3Bucket, } shouldInterruptLoop = true } else { @@ -245,6 +253,10 @@ func (s SyncManager) Start() error { Refill: s.Refill, Manager: &s, LbrycrdString: s.LbrycrdString, + AwsS3ID: s.AwsS3ID, + AwsS3Secret: s.AwsS3Secret, + AwsS3Region: s.AwsS3Region, + AwsS3Bucket: s.AwsS3Bucket, }) } } diff --git a/ytsync.go b/ytsync.go index 70dee4e..808b4f6 100644 --- a/ytsync.go +++ b/ytsync.go @@ -17,6 +17,12 @@ import ( "syscall" "time" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/jsonrpc" "github.com/lbryio/lbry.go/stop" @@ -61,6 +67,10 @@ type Sync struct { Refill int Manager *SyncManager LbrycrdString string + AwsS3ID string + AwsS3Secret string + AwsS3Region string + AwsS3Bucket string daemon *jsonrpc.Client claimAddress string @@ -115,6 +125,97 @@ func (s *Sync) IsInterrupted() bool { } } +func (s *Sync) downloadWallet() error { + defaultWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/default_wallet" + defaultTempWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/tmp_wallet" + key := aws.String("/wallets/" + s.YoutubeChannelID) + if os.Getenv("REGTEST") == "true" { + defaultWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/default_wallet" + defaultTempWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/tmp_wallet" + key = aws.String("/regtest/" + s.YoutubeChannelID) + } + + if _, err := os.Stat(defaultWalletDir); !os.IsNotExist(err) { + return errors.Err("default_wallet already exists") + } + + creds := credentials.NewStaticCredentials(s.AwsS3ID, s.AwsS3Secret, "") + s3Session, err := session.NewSession(&aws.Config{Region: aws.String(s.AwsS3Region), Credentials: creds}) + if err != nil { + return err + } + downloader := s3manager.NewDownloader(s3Session) + out, err := os.Create(defaultTempWalletDir) + if err != nil { + return err + } + defer out.Close() + + bytesWritten, err := downloader.Download(out, &s3.GetObjectInput{ + Bucket: aws.String(s.AwsS3Bucket), + Key: key, + }) + if err != nil { + // Casting to the awserr.Error type will allow you to inspect the error + // code returned by the service in code. The error code can be used + // to switch on context specific functionality. In this case a context + // specific error message is printed to the user based on the bucket + // and key existing. + // + // For information on other S3 API error codes see: + // http://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html + if aerr, ok := err.(awserr.Error); ok { + code := aerr.Code() + if code == s3.ErrCodeNoSuchKey { + return errors.Err("wallet not on S3") + } + } + return err + } else if bytesWritten == 0 { + return errors.Err("zero bytes written") + } + + return os.Rename(defaultTempWalletDir, defaultWalletDir) +} + +func (s *Sync) uploadWallet() error { + defaultWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/default_wallet" + key := aws.String("/wallets/" + s.YoutubeChannelID) + if os.Getenv("REGTEST") == "true" { + defaultWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/default_wallet" + key = aws.String("/regtest/" + s.YoutubeChannelID) + } + + if _, err := os.Stat(defaultWalletDir); os.IsNotExist(err) { + return errors.Err("default_wallet does not exist") + } + + creds := credentials.NewStaticCredentials(s.AwsS3ID, s.AwsS3Secret, "") + s3Session, err := session.NewSession(&aws.Config{Region: aws.String(s.AwsS3Region), Credentials: creds}) + if err != nil { + return err + } + + uploader := s3manager.NewUploader(s3Session) + + file, err := os.Open(defaultWalletDir) + if err != nil { + return err + } + defer file.Close() + + _, err = uploader.Upload(&s3manager.UploadInput{ + Bucket: aws.String(s.AwsS3Bucket), + Key: key, + Body: file, + }) + if err != nil { + return err + } + + return os.Remove(defaultWalletDir) +} + func (s *Sync) FullCycle() (e error) { if os.Getenv("HOME") == "" { return errors.Err("no $HOME env var found") @@ -142,74 +243,18 @@ func (s *Sync) FullCycle() (e error) { s.syncedVideos = syncedVideos s.syncedVideosMux.Unlock() - defer func() { - if e != nil { - //conditions for which a channel shouldn't be marked as failed - noFailConditions := []string{ - "this youtube channel is being managed by another server", - } - if util.SubstringInSlice(e.Error(), noFailConditions) { - return - } - _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed) - if err != nil { - msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName) - err = errors.Prefix(msg, err) - e = errors.Prefix(err.Error(), e) - } - } else if !s.IsInterrupted() { - _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced) - if err != nil { - e = err - } - } - }() + defer s.updateChannelStatus(&e) - defaultWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/default_wallet" - if os.Getenv("REGTEST") == "true" { - defaultWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/default_wallet" - } - walletBackupDir := os.Getenv("HOME") + "/wallets/" + strings.Replace(s.LbryChannelName, "@", "", 1) - newWalletBackupDir := os.Getenv("HOME") + "/renamed_wallets/" + s.YoutubeChannelID - - if _, err := os.Stat(defaultWalletDir); !os.IsNotExist(err) { - return errors.Err("default_wallet already exists") - } - - if _, err = os.Stat(walletBackupDir); !os.IsNotExist(err) { - err = os.Rename(walletBackupDir, defaultWalletDir) - if err != nil { - return errors.Wrap(err, 0) - } - log.Println("Continuing previous upload") - } else if _, err = os.Stat(newWalletBackupDir); !os.IsNotExist(err) { - err = os.Rename(newWalletBackupDir, defaultWalletDir) - if err != nil { - return errors.Wrap(err, 0) - } + err = s.downloadWallet() + if err != nil && err.Error() != "wallet not on S3" { + return errors.Prefix("failure in downloading wallet: ", err) + } else if err == nil { log.Println("Continuing previous upload") + } else { + log.Println("Starting new wallet") } - defer func() { - log.Printf("Stopping daemon") - shutdownErr := stopDaemonViaSystemd() - if shutdownErr != nil { - logShutdownError(shutdownErr) - } else { - // the cli will return long before the daemon effectively stops. we must observe the processes running - // before moving the wallet - waitTimeout := 8 * time.Minute - processDeathError := waitForDaemonProcess(waitTimeout) - if processDeathError != nil { - logShutdownError(processDeathError) - } else { - walletErr := os.Rename(defaultWalletDir, newWalletBackupDir) - if walletErr != nil { - log.Errorf("error moving wallet to backup dir: %v", walletErr) - } - } - } - }() + defer s.stopAndUploadWallet(&e) s.videoDirectory, err = ioutil.TempDir("", "ytsync") if err != nil { @@ -226,18 +271,9 @@ func (s *Sync) FullCycle() (e error) { s.daemon = jsonrpc.NewClient("") s.daemon.SetRPCTimeout(40 * time.Minute) -WaitForDaemonStart: - for { - select { - case <-s.grp.Ch(): - return nil - default: - _, err := s.daemon.WalletBalance() - if err == nil { - break WaitForDaemonStart - } - time.Sleep(5 * time.Second) - } + err = s.waitForDaemonStart() + if err != nil { + return err } err = s.doSync() @@ -252,6 +288,69 @@ WaitForDaemonStart: return nil } +func (s *Sync) updateChannelStatus(e *error) { + if *e != nil { + //conditions for which a channel shouldn't be marked as failed + noFailConditions := []string{ + "this youtube channel is being managed by another server", + } + if util.SubstringInSlice((*e).Error(), noFailConditions) { + return + } + _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed) + if err != nil { + msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName) + err = errors.Prefix(msg, err) + *e = errors.Prefix(err.Error(), *e) + } + } else if !s.IsInterrupted() { + _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced) + if err != nil { + *e = err + } + } +} + +func (s *Sync) waitForDaemonStart() error { + + for { + select { + case <-s.grp.Ch(): + return errors.Err("interrupted during daemon startup") + default: + _, err := s.daemon.WalletBalance() + if err == nil { + return nil + } + time.Sleep(5 * time.Second) + } + } +} +func (s *Sync) stopAndUploadWallet(e *error) { + log.Printf("Stopping daemon") + shutdownErr := stopDaemonViaSystemd() + if shutdownErr != nil { + logShutdownError(shutdownErr) + } else { + // the cli will return long before the daemon effectively stops. we must observe the processes running + // before moving the wallet + waitTimeout := 8 * time.Minute + processDeathError := waitForDaemonProcess(waitTimeout) + if processDeathError != nil { + logShutdownError(processDeathError) + } else { + err := s.uploadWallet() + if err != nil { + if *e == nil { + e = &err + return + } else { + *e = errors.Prefix("failure uploading wallet: ", e) + } + } + } + } +} func logShutdownError(shutdownErr error) { SendErrorToSlack("error shutting down daemon: %v", shutdownErr) SendErrorToSlack("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR") @@ -324,6 +423,7 @@ func (s *Sync) startWorker(workerNum int) { "no space left on device", "NotEnoughFunds", "Cannot publish using channel", + "cannot concatenate 'str' and 'NoneType' objects", "more than 90% of the space has been used.", } if util.SubstringInSlice(err.Error(), fatalErrors) || s.StopOnError {