add support for S3 wallets

remove local wallets support
refactored startup and shutdown functions
This commit is contained in:
Niko Storni 2018-08-09 16:54:23 -04:00
parent dafce6ae84
commit ea7e2707d0
2 changed files with 188 additions and 76 deletions

View file

@ -40,6 +40,10 @@ type SyncManager struct {
VideosLimit int VideosLimit int
MaxVideoSize int MaxVideoSize int
LbrycrdString string LbrycrdString string
AwsS3ID string
AwsS3Secret string
AwsS3Region string
AwsS3Bucket string
} }
const ( const (
@ -214,6 +218,10 @@ func (s SyncManager) Start() error {
Refill: s.Refill, Refill: s.Refill,
Manager: &s, Manager: &s,
LbrycrdString: s.LbrycrdString, LbrycrdString: s.LbrycrdString,
AwsS3ID: s.AwsS3ID,
AwsS3Secret: s.AwsS3Secret,
AwsS3Region: s.AwsS3Region,
AwsS3Bucket: s.AwsS3Bucket,
} }
shouldInterruptLoop = true shouldInterruptLoop = true
} else { } else {
@ -245,6 +253,10 @@ func (s SyncManager) Start() error {
Refill: s.Refill, Refill: s.Refill,
Manager: &s, Manager: &s,
LbrycrdString: s.LbrycrdString, LbrycrdString: s.LbrycrdString,
AwsS3ID: s.AwsS3ID,
AwsS3Secret: s.AwsS3Secret,
AwsS3Region: s.AwsS3Region,
AwsS3Bucket: s.AwsS3Bucket,
}) })
} }
} }

252
ytsync.go
View file

@ -17,6 +17,12 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/jsonrpc" "github.com/lbryio/lbry.go/jsonrpc"
"github.com/lbryio/lbry.go/stop" "github.com/lbryio/lbry.go/stop"
@ -61,6 +67,10 @@ type Sync struct {
Refill int Refill int
Manager *SyncManager Manager *SyncManager
LbrycrdString string LbrycrdString string
AwsS3ID string
AwsS3Secret string
AwsS3Region string
AwsS3Bucket string
daemon *jsonrpc.Client daemon *jsonrpc.Client
claimAddress string claimAddress string
@ -115,6 +125,97 @@ func (s *Sync) IsInterrupted() bool {
} }
} }
func (s *Sync) downloadWallet() error {
defaultWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/default_wallet"
defaultTempWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/tmp_wallet"
key := aws.String("/wallets/" + s.YoutubeChannelID)
if os.Getenv("REGTEST") == "true" {
defaultWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/default_wallet"
defaultTempWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/tmp_wallet"
key = aws.String("/regtest/" + s.YoutubeChannelID)
}
if _, err := os.Stat(defaultWalletDir); !os.IsNotExist(err) {
return errors.Err("default_wallet already exists")
}
creds := credentials.NewStaticCredentials(s.AwsS3ID, s.AwsS3Secret, "")
s3Session, err := session.NewSession(&aws.Config{Region: aws.String(s.AwsS3Region), Credentials: creds})
if err != nil {
return err
}
downloader := s3manager.NewDownloader(s3Session)
out, err := os.Create(defaultTempWalletDir)
if err != nil {
return err
}
defer out.Close()
bytesWritten, err := downloader.Download(out, &s3.GetObjectInput{
Bucket: aws.String(s.AwsS3Bucket),
Key: key,
})
if err != nil {
// Casting to the awserr.Error type will allow you to inspect the error
// code returned by the service in code. The error code can be used
// to switch on context specific functionality. In this case a context
// specific error message is printed to the user based on the bucket
// and key existing.
//
// For information on other S3 API error codes see:
// http://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
if aerr, ok := err.(awserr.Error); ok {
code := aerr.Code()
if code == s3.ErrCodeNoSuchKey {
return errors.Err("wallet not on S3")
}
}
return err
} else if bytesWritten == 0 {
return errors.Err("zero bytes written")
}
return os.Rename(defaultTempWalletDir, defaultWalletDir)
}
func (s *Sync) uploadWallet() error {
defaultWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/default_wallet"
key := aws.String("/wallets/" + s.YoutubeChannelID)
if os.Getenv("REGTEST") == "true" {
defaultWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/default_wallet"
key = aws.String("/regtest/" + s.YoutubeChannelID)
}
if _, err := os.Stat(defaultWalletDir); os.IsNotExist(err) {
return errors.Err("default_wallet does not exist")
}
creds := credentials.NewStaticCredentials(s.AwsS3ID, s.AwsS3Secret, "")
s3Session, err := session.NewSession(&aws.Config{Region: aws.String(s.AwsS3Region), Credentials: creds})
if err != nil {
return err
}
uploader := s3manager.NewUploader(s3Session)
file, err := os.Open(defaultWalletDir)
if err != nil {
return err
}
defer file.Close()
_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(s.AwsS3Bucket),
Key: key,
Body: file,
})
if err != nil {
return err
}
return os.Remove(defaultWalletDir)
}
func (s *Sync) FullCycle() (e error) { func (s *Sync) FullCycle() (e error) {
if os.Getenv("HOME") == "" { if os.Getenv("HOME") == "" {
return errors.Err("no $HOME env var found") return errors.Err("no $HOME env var found")
@ -142,74 +243,18 @@ func (s *Sync) FullCycle() (e error) {
s.syncedVideos = syncedVideos s.syncedVideos = syncedVideos
s.syncedVideosMux.Unlock() s.syncedVideosMux.Unlock()
defer func() { defer s.updateChannelStatus(&e)
if e != nil {
//conditions for which a channel shouldn't be marked as failed
noFailConditions := []string{
"this youtube channel is being managed by another server",
}
if util.SubstringInSlice(e.Error(), noFailConditions) {
return
}
_, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed)
if err != nil {
msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName)
err = errors.Prefix(msg, err)
e = errors.Prefix(err.Error(), e)
}
} else if !s.IsInterrupted() {
_, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced)
if err != nil {
e = err
}
}
}()
defaultWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/default_wallet" err = s.downloadWallet()
if os.Getenv("REGTEST") == "true" { if err != nil && err.Error() != "wallet not on S3" {
defaultWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/default_wallet" return errors.Prefix("failure in downloading wallet: ", err)
} } else if err == nil {
walletBackupDir := os.Getenv("HOME") + "/wallets/" + strings.Replace(s.LbryChannelName, "@", "", 1)
newWalletBackupDir := os.Getenv("HOME") + "/renamed_wallets/" + s.YoutubeChannelID
if _, err := os.Stat(defaultWalletDir); !os.IsNotExist(err) {
return errors.Err("default_wallet already exists")
}
if _, err = os.Stat(walletBackupDir); !os.IsNotExist(err) {
err = os.Rename(walletBackupDir, defaultWalletDir)
if err != nil {
return errors.Wrap(err, 0)
}
log.Println("Continuing previous upload")
} else if _, err = os.Stat(newWalletBackupDir); !os.IsNotExist(err) {
err = os.Rename(newWalletBackupDir, defaultWalletDir)
if err != nil {
return errors.Wrap(err, 0)
}
log.Println("Continuing previous upload") log.Println("Continuing previous upload")
} else {
log.Println("Starting new wallet")
} }
defer func() { defer s.stopAndUploadWallet(&e)
log.Printf("Stopping daemon")
shutdownErr := stopDaemonViaSystemd()
if shutdownErr != nil {
logShutdownError(shutdownErr)
} else {
// the cli will return long before the daemon effectively stops. we must observe the processes running
// before moving the wallet
waitTimeout := 8 * time.Minute
processDeathError := waitForDaemonProcess(waitTimeout)
if processDeathError != nil {
logShutdownError(processDeathError)
} else {
walletErr := os.Rename(defaultWalletDir, newWalletBackupDir)
if walletErr != nil {
log.Errorf("error moving wallet to backup dir: %v", walletErr)
}
}
}
}()
s.videoDirectory, err = ioutil.TempDir("", "ytsync") s.videoDirectory, err = ioutil.TempDir("", "ytsync")
if err != nil { if err != nil {
@ -226,18 +271,9 @@ func (s *Sync) FullCycle() (e error) {
s.daemon = jsonrpc.NewClient("") s.daemon = jsonrpc.NewClient("")
s.daemon.SetRPCTimeout(40 * time.Minute) s.daemon.SetRPCTimeout(40 * time.Minute)
WaitForDaemonStart: err = s.waitForDaemonStart()
for { if err != nil {
select { return err
case <-s.grp.Ch():
return nil
default:
_, err := s.daemon.WalletBalance()
if err == nil {
break WaitForDaemonStart
}
time.Sleep(5 * time.Second)
}
} }
err = s.doSync() err = s.doSync()
@ -252,6 +288,69 @@ WaitForDaemonStart:
return nil return nil
} }
func (s *Sync) updateChannelStatus(e *error) {
if *e != nil {
//conditions for which a channel shouldn't be marked as failed
noFailConditions := []string{
"this youtube channel is being managed by another server",
}
if util.SubstringInSlice((*e).Error(), noFailConditions) {
return
}
_, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed)
if err != nil {
msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName)
err = errors.Prefix(msg, err)
*e = errors.Prefix(err.Error(), *e)
}
} else if !s.IsInterrupted() {
_, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced)
if err != nil {
*e = err
}
}
}
func (s *Sync) waitForDaemonStart() error {
for {
select {
case <-s.grp.Ch():
return errors.Err("interrupted during daemon startup")
default:
_, err := s.daemon.WalletBalance()
if err == nil {
return nil
}
time.Sleep(5 * time.Second)
}
}
}
func (s *Sync) stopAndUploadWallet(e *error) {
log.Printf("Stopping daemon")
shutdownErr := stopDaemonViaSystemd()
if shutdownErr != nil {
logShutdownError(shutdownErr)
} else {
// the cli will return long before the daemon effectively stops. we must observe the processes running
// before moving the wallet
waitTimeout := 8 * time.Minute
processDeathError := waitForDaemonProcess(waitTimeout)
if processDeathError != nil {
logShutdownError(processDeathError)
} else {
err := s.uploadWallet()
if err != nil {
if *e == nil {
e = &err
return
} else {
*e = errors.Prefix("failure uploading wallet: ", e)
}
}
}
}
}
func logShutdownError(shutdownErr error) { func logShutdownError(shutdownErr error) {
SendErrorToSlack("error shutting down daemon: %v", shutdownErr) SendErrorToSlack("error shutting down daemon: %v", shutdownErr)
SendErrorToSlack("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR") SendErrorToSlack("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR")
@ -324,6 +423,7 @@ func (s *Sync) startWorker(workerNum int) {
"no space left on device", "no space left on device",
"NotEnoughFunds", "NotEnoughFunds",
"Cannot publish using channel", "Cannot publish using channel",
"cannot concatenate 'str' and 'NoneType' objects",
"more than 90% of the space has been used.", "more than 90% of the space has been used.",
} }
if util.SubstringInSlice(err.Error(), fatalErrors) || s.StopOnError { if util.SubstringInSlice(err.Error(), fatalErrors) || s.StopOnError {