From f434232caea0e1fbb622e76f79143b7d22a5ea9c Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Wed, 8 Aug 2018 17:59:59 -0400 Subject: [PATCH 1/4] use channelIDs instead of channel names add support for custom lbrycrd instance --- cmd/ytsync.go | 5 +++++ ytsync/manager.go | 3 +++ ytsync/setup.go | 20 ++++++++++++++------ ytsync/ytsync.go | 10 +++++++++- 4 files changed, 31 insertions(+), 7 deletions(-) diff --git a/cmd/ytsync.go b/cmd/ytsync.go index d5284bb..6e0fd87 100644 --- a/cmd/ytsync.go +++ b/cmd/ytsync.go @@ -94,6 +94,7 @@ func ytSync(cmd *cobra.Command, args []string) { apiToken := os.Getenv("LBRY_API_TOKEN") youtubeAPIKey := os.Getenv("YOUTUBE_API_KEY") blobsDir := os.Getenv("BLOBS_DIRECTORY") + lbrycrdString := os.Getenv("LBRYCRD_STRING") if apiURL == "" { log.Errorln("An API URL was not defined. Please set the environment variable LBRY_API") return @@ -106,6 +107,9 @@ func ytSync(cmd *cobra.Command, args []string) { log.Errorln("A Youtube API key was not defined. Please set the environment variable YOUTUBE_API_KEY") return } + if lbrycrdString == "" { + log.Infoln("Using default (local) lbrycrd instance. Set LBRYCRD_STRING if you want to use something else") + } if blobsDir == "" { usr, err := user.Current() if err != nil { @@ -136,6 +140,7 @@ func ytSync(cmd *cobra.Command, args []string) { BlobsDir: blobsDir, VideosLimit: videosLimit, MaxVideoSize: maxVideoSize, + LbrycrdString: lbrycrdString, } err := sm.Start() diff --git a/ytsync/manager.go b/ytsync/manager.go index 9e72d90..825ddb5 100644 --- a/ytsync/manager.go +++ b/ytsync/manager.go @@ -39,6 +39,7 @@ type SyncManager struct { BlobsDir string VideosLimit int MaxVideoSize int + LbrycrdString string } const ( @@ -212,6 +213,7 @@ func (s SyncManager) Start() error { TakeOverExistingChannel: s.TakeOverExistingChannel, Refill: s.Refill, Manager: &s, + LbrycrdString: s.LbrycrdString, } shouldInterruptLoop = true } else { @@ -242,6 +244,7 @@ func (s SyncManager) Start() error { TakeOverExistingChannel: s.TakeOverExistingChannel, Refill: s.Refill, Manager: &s, + LbrycrdString: s.LbrycrdString, }) } } diff --git a/ytsync/setup.go b/ytsync/setup.go index 7a40ccd..e154f16 100644 --- a/ytsync/setup.go +++ b/ytsync/setup.go @@ -104,6 +104,7 @@ func (s *Sync) ensureEnoughUTXOs() error { } target := 40 + slack := target - int(float32(0.05)*float32(target)) count := 0 for _, utxo := range *utxolist { @@ -112,7 +113,7 @@ func (s *Sync) ensureEnoughUTXOs() error { } } - if count < target { + if count < target-slack { newAddresses := target - count balance, err := s.daemon.WalletBalance() @@ -257,9 +258,18 @@ func allUTXOsConfirmed(utxolist *jsonrpc.UTXOListResponse) bool { func (s *Sync) addCredits(amountToAdd float64) error { log.Printf("Adding %f credits", amountToAdd) - lbrycrdd, err := lbrycrd.NewWithDefaultURL() - if err != nil { - return err + var lbrycrdd *lbrycrd.Client + var err error + if s.LbrycrdString == "" { + lbrycrdd, err = lbrycrd.NewWithDefaultURL() + if err != nil { + return err + } + } else { + lbrycrdd, err = lbrycrd.New(s.LbrycrdString) + if err != nil { + return err + } } addressResp, err := s.daemon.WalletUnusedAddress() @@ -280,6 +290,4 @@ func (s *Sync) addCredits(amountToAdd float64) error { time.Sleep(wait) return nil - //log.Println("Waiting for transaction to be confirmed") - //return s.waitUntilUTXOsConfirmed() } diff --git a/ytsync/ytsync.go b/ytsync/ytsync.go index c44d9c6..70dee4e 100644 --- a/ytsync/ytsync.go +++ b/ytsync/ytsync.go @@ -60,6 +60,7 @@ type Sync struct { TakeOverExistingChannel bool Refill int Manager *SyncManager + LbrycrdString string daemon *jsonrpc.Client claimAddress string @@ -169,6 +170,7 @@ func (s *Sync) FullCycle() (e error) { defaultWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/default_wallet" } walletBackupDir := os.Getenv("HOME") + "/wallets/" + strings.Replace(s.LbryChannelName, "@", "", 1) + newWalletBackupDir := os.Getenv("HOME") + "/renamed_wallets/" + s.YoutubeChannelID if _, err := os.Stat(defaultWalletDir); !os.IsNotExist(err) { return errors.Err("default_wallet already exists") @@ -180,6 +182,12 @@ func (s *Sync) FullCycle() (e error) { return errors.Wrap(err, 0) } log.Println("Continuing previous upload") + } else if _, err = os.Stat(newWalletBackupDir); !os.IsNotExist(err) { + err = os.Rename(newWalletBackupDir, defaultWalletDir) + if err != nil { + return errors.Wrap(err, 0) + } + log.Println("Continuing previous upload") } defer func() { @@ -195,7 +203,7 @@ func (s *Sync) FullCycle() (e error) { if processDeathError != nil { logShutdownError(processDeathError) } else { - walletErr := os.Rename(defaultWalletDir, walletBackupDir) + walletErr := os.Rename(defaultWalletDir, newWalletBackupDir) if walletErr != nil { log.Errorf("error moving wallet to backup dir: %v", walletErr) } -- 2.45.3 From b6d71e385ab625b95007eae10df8ec79bcb04e24 Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Thu, 9 Aug 2018 16:54:23 -0400 Subject: [PATCH 2/4] add support for S3 wallets remove local wallets support refactored startup and shutdown functions --- cmd/ytsync.go | 24 +++++ ytsync/manager.go | 12 +++ ytsync/ytsync.go | 252 ++++++++++++++++++++++++++++++++-------------- 3 files changed, 212 insertions(+), 76 deletions(-) diff --git a/cmd/ytsync.go b/cmd/ytsync.go index 6e0fd87..35fa489 100644 --- a/cmd/ytsync.go +++ b/cmd/ytsync.go @@ -95,6 +95,10 @@ func ytSync(cmd *cobra.Command, args []string) { youtubeAPIKey := os.Getenv("YOUTUBE_API_KEY") blobsDir := os.Getenv("BLOBS_DIRECTORY") lbrycrdString := os.Getenv("LBRYCRD_STRING") + awsS3ID := os.Getenv("AWS_S3_ID") + awsS3Secret := os.Getenv("AWS_S3_SECRET") + awsS3Region := os.Getenv("AWS_S3_REGION") + awsS3Bucket := os.Getenv("AWS_S3_BUCKET") if apiURL == "" { log.Errorln("An API URL was not defined. Please set the environment variable LBRY_API") return @@ -107,6 +111,22 @@ func ytSync(cmd *cobra.Command, args []string) { log.Errorln("A Youtube API key was not defined. Please set the environment variable YOUTUBE_API_KEY") return } + if awsS3ID == "" { + log.Errorln("AWS S3 ID credentials were not defined. Please set the environment variable AWS_S3_ID") + return + } + if awsS3Secret == "" { + log.Errorln("AWS S3 Secret credentials were not defined. Please set the environment variable AWS_S3_SECRET") + return + } + if awsS3Region == "" { + log.Errorln("AWS S3 Region was not defined. Please set the environment variable AWS_S3_REGION") + return + } + if awsS3Bucket == "" { + log.Errorln("AWS S3 Bucket was not defined. Please set the environment variable AWS_S3_BUCKET") + return + } if lbrycrdString == "" { log.Infoln("Using default (local) lbrycrd instance. Set LBRYCRD_STRING if you want to use something else") } @@ -141,6 +161,10 @@ func ytSync(cmd *cobra.Command, args []string) { VideosLimit: videosLimit, MaxVideoSize: maxVideoSize, LbrycrdString: lbrycrdString, + AwsS3ID: awsS3ID, + AwsS3Secret: awsS3Secret, + AwsS3Region: awsS3Region, + AwsS3Bucket: awsS3Bucket, } err := sm.Start() diff --git a/ytsync/manager.go b/ytsync/manager.go index 825ddb5..bc38810 100644 --- a/ytsync/manager.go +++ b/ytsync/manager.go @@ -40,6 +40,10 @@ type SyncManager struct { VideosLimit int MaxVideoSize int LbrycrdString string + AwsS3ID string + AwsS3Secret string + AwsS3Region string + AwsS3Bucket string } const ( @@ -214,6 +218,10 @@ func (s SyncManager) Start() error { Refill: s.Refill, Manager: &s, LbrycrdString: s.LbrycrdString, + AwsS3ID: s.AwsS3ID, + AwsS3Secret: s.AwsS3Secret, + AwsS3Region: s.AwsS3Region, + AwsS3Bucket: s.AwsS3Bucket, } shouldInterruptLoop = true } else { @@ -245,6 +253,10 @@ func (s SyncManager) Start() error { Refill: s.Refill, Manager: &s, LbrycrdString: s.LbrycrdString, + AwsS3ID: s.AwsS3ID, + AwsS3Secret: s.AwsS3Secret, + AwsS3Region: s.AwsS3Region, + AwsS3Bucket: s.AwsS3Bucket, }) } } diff --git a/ytsync/ytsync.go b/ytsync/ytsync.go index 70dee4e..808b4f6 100644 --- a/ytsync/ytsync.go +++ b/ytsync/ytsync.go @@ -17,6 +17,12 @@ import ( "syscall" "time" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/jsonrpc" "github.com/lbryio/lbry.go/stop" @@ -61,6 +67,10 @@ type Sync struct { Refill int Manager *SyncManager LbrycrdString string + AwsS3ID string + AwsS3Secret string + AwsS3Region string + AwsS3Bucket string daemon *jsonrpc.Client claimAddress string @@ -115,6 +125,97 @@ func (s *Sync) IsInterrupted() bool { } } +func (s *Sync) downloadWallet() error { + defaultWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/default_wallet" + defaultTempWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/tmp_wallet" + key := aws.String("/wallets/" + s.YoutubeChannelID) + if os.Getenv("REGTEST") == "true" { + defaultWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/default_wallet" + defaultTempWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/tmp_wallet" + key = aws.String("/regtest/" + s.YoutubeChannelID) + } + + if _, err := os.Stat(defaultWalletDir); !os.IsNotExist(err) { + return errors.Err("default_wallet already exists") + } + + creds := credentials.NewStaticCredentials(s.AwsS3ID, s.AwsS3Secret, "") + s3Session, err := session.NewSession(&aws.Config{Region: aws.String(s.AwsS3Region), Credentials: creds}) + if err != nil { + return err + } + downloader := s3manager.NewDownloader(s3Session) + out, err := os.Create(defaultTempWalletDir) + if err != nil { + return err + } + defer out.Close() + + bytesWritten, err := downloader.Download(out, &s3.GetObjectInput{ + Bucket: aws.String(s.AwsS3Bucket), + Key: key, + }) + if err != nil { + // Casting to the awserr.Error type will allow you to inspect the error + // code returned by the service in code. The error code can be used + // to switch on context specific functionality. In this case a context + // specific error message is printed to the user based on the bucket + // and key existing. + // + // For information on other S3 API error codes see: + // http://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html + if aerr, ok := err.(awserr.Error); ok { + code := aerr.Code() + if code == s3.ErrCodeNoSuchKey { + return errors.Err("wallet not on S3") + } + } + return err + } else if bytesWritten == 0 { + return errors.Err("zero bytes written") + } + + return os.Rename(defaultTempWalletDir, defaultWalletDir) +} + +func (s *Sync) uploadWallet() error { + defaultWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/default_wallet" + key := aws.String("/wallets/" + s.YoutubeChannelID) + if os.Getenv("REGTEST") == "true" { + defaultWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/default_wallet" + key = aws.String("/regtest/" + s.YoutubeChannelID) + } + + if _, err := os.Stat(defaultWalletDir); os.IsNotExist(err) { + return errors.Err("default_wallet does not exist") + } + + creds := credentials.NewStaticCredentials(s.AwsS3ID, s.AwsS3Secret, "") + s3Session, err := session.NewSession(&aws.Config{Region: aws.String(s.AwsS3Region), Credentials: creds}) + if err != nil { + return err + } + + uploader := s3manager.NewUploader(s3Session) + + file, err := os.Open(defaultWalletDir) + if err != nil { + return err + } + defer file.Close() + + _, err = uploader.Upload(&s3manager.UploadInput{ + Bucket: aws.String(s.AwsS3Bucket), + Key: key, + Body: file, + }) + if err != nil { + return err + } + + return os.Remove(defaultWalletDir) +} + func (s *Sync) FullCycle() (e error) { if os.Getenv("HOME") == "" { return errors.Err("no $HOME env var found") @@ -142,74 +243,18 @@ func (s *Sync) FullCycle() (e error) { s.syncedVideos = syncedVideos s.syncedVideosMux.Unlock() - defer func() { - if e != nil { - //conditions for which a channel shouldn't be marked as failed - noFailConditions := []string{ - "this youtube channel is being managed by another server", - } - if util.SubstringInSlice(e.Error(), noFailConditions) { - return - } - _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed) - if err != nil { - msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName) - err = errors.Prefix(msg, err) - e = errors.Prefix(err.Error(), e) - } - } else if !s.IsInterrupted() { - _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced) - if err != nil { - e = err - } - } - }() + defer s.updateChannelStatus(&e) - defaultWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/default_wallet" - if os.Getenv("REGTEST") == "true" { - defaultWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/default_wallet" - } - walletBackupDir := os.Getenv("HOME") + "/wallets/" + strings.Replace(s.LbryChannelName, "@", "", 1) - newWalletBackupDir := os.Getenv("HOME") + "/renamed_wallets/" + s.YoutubeChannelID - - if _, err := os.Stat(defaultWalletDir); !os.IsNotExist(err) { - return errors.Err("default_wallet already exists") - } - - if _, err = os.Stat(walletBackupDir); !os.IsNotExist(err) { - err = os.Rename(walletBackupDir, defaultWalletDir) - if err != nil { - return errors.Wrap(err, 0) - } - log.Println("Continuing previous upload") - } else if _, err = os.Stat(newWalletBackupDir); !os.IsNotExist(err) { - err = os.Rename(newWalletBackupDir, defaultWalletDir) - if err != nil { - return errors.Wrap(err, 0) - } + err = s.downloadWallet() + if err != nil && err.Error() != "wallet not on S3" { + return errors.Prefix("failure in downloading wallet: ", err) + } else if err == nil { log.Println("Continuing previous upload") + } else { + log.Println("Starting new wallet") } - defer func() { - log.Printf("Stopping daemon") - shutdownErr := stopDaemonViaSystemd() - if shutdownErr != nil { - logShutdownError(shutdownErr) - } else { - // the cli will return long before the daemon effectively stops. we must observe the processes running - // before moving the wallet - waitTimeout := 8 * time.Minute - processDeathError := waitForDaemonProcess(waitTimeout) - if processDeathError != nil { - logShutdownError(processDeathError) - } else { - walletErr := os.Rename(defaultWalletDir, newWalletBackupDir) - if walletErr != nil { - log.Errorf("error moving wallet to backup dir: %v", walletErr) - } - } - } - }() + defer s.stopAndUploadWallet(&e) s.videoDirectory, err = ioutil.TempDir("", "ytsync") if err != nil { @@ -226,18 +271,9 @@ func (s *Sync) FullCycle() (e error) { s.daemon = jsonrpc.NewClient("") s.daemon.SetRPCTimeout(40 * time.Minute) -WaitForDaemonStart: - for { - select { - case <-s.grp.Ch(): - return nil - default: - _, err := s.daemon.WalletBalance() - if err == nil { - break WaitForDaemonStart - } - time.Sleep(5 * time.Second) - } + err = s.waitForDaemonStart() + if err != nil { + return err } err = s.doSync() @@ -252,6 +288,69 @@ WaitForDaemonStart: return nil } +func (s *Sync) updateChannelStatus(e *error) { + if *e != nil { + //conditions for which a channel shouldn't be marked as failed + noFailConditions := []string{ + "this youtube channel is being managed by another server", + } + if util.SubstringInSlice((*e).Error(), noFailConditions) { + return + } + _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed) + if err != nil { + msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName) + err = errors.Prefix(msg, err) + *e = errors.Prefix(err.Error(), *e) + } + } else if !s.IsInterrupted() { + _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced) + if err != nil { + *e = err + } + } +} + +func (s *Sync) waitForDaemonStart() error { + + for { + select { + case <-s.grp.Ch(): + return errors.Err("interrupted during daemon startup") + default: + _, err := s.daemon.WalletBalance() + if err == nil { + return nil + } + time.Sleep(5 * time.Second) + } + } +} +func (s *Sync) stopAndUploadWallet(e *error) { + log.Printf("Stopping daemon") + shutdownErr := stopDaemonViaSystemd() + if shutdownErr != nil { + logShutdownError(shutdownErr) + } else { + // the cli will return long before the daemon effectively stops. we must observe the processes running + // before moving the wallet + waitTimeout := 8 * time.Minute + processDeathError := waitForDaemonProcess(waitTimeout) + if processDeathError != nil { + logShutdownError(processDeathError) + } else { + err := s.uploadWallet() + if err != nil { + if *e == nil { + e = &err + return + } else { + *e = errors.Prefix("failure uploading wallet: ", e) + } + } + } + } +} func logShutdownError(shutdownErr error) { SendErrorToSlack("error shutting down daemon: %v", shutdownErr) SendErrorToSlack("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR") @@ -324,6 +423,7 @@ func (s *Sync) startWorker(workerNum int) { "no space left on device", "NotEnoughFunds", "Cannot publish using channel", + "cannot concatenate 'str' and 'NoneType' objects", "more than 90% of the space has been used.", } if util.SubstringInSlice(err.Error(), fatalErrors) || s.StopOnError { -- 2.45.3 From 758b68c7515bc0d301e19f9b634f7147bf2ba344 Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Thu, 9 Aug 2018 20:04:39 -0400 Subject: [PATCH 3/4] various bug fixes --- ytsync/manager.go | 4 ++++ ytsync/ytsync.go | 15 +++++++++++---- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/ytsync/manager.go b/ytsync/manager.go index bc38810..0827344 100644 --- a/ytsync/manager.go +++ b/ytsync/manager.go @@ -159,6 +159,10 @@ func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status st vals.Add("claim_name", claimName) } if failureReason != "" { + maxReasonLength := 500 + if len(failureReason) > maxReasonLength { + failureReason = failureReason[:500] + } vals.Add("failure_reason", failureReason) } res, _ := http.PostForm(endpoint, vals) diff --git a/ytsync/ytsync.go b/ytsync/ytsync.go index 808b4f6..b01e974 100644 --- a/ytsync/ytsync.go +++ b/ytsync/ytsync.go @@ -445,14 +445,21 @@ func (s *Sync) startWorker(workerNum int) { log.Println("This error should not be retried at all") } else if tryCount < s.MaxTries { if strings.Contains(err.Error(), "txn-mempool-conflict") || - strings.Contains(err.Error(), "failed: Not enough funds") || - strings.Contains(err.Error(), "Error in daemon: Insufficient funds, please deposit additional LBC") || strings.Contains(err.Error(), "too-long-mempool-chain") { - log.Println("waiting for a block and refilling addresses before retrying") + log.Println("waiting for a block before retrying") + err = s.waitForNewBlock() + if err != nil { + s.grp.Stop() + SendErrorToSlack("something went wrong while waiting for a block: %v", err) + break + } + } else if strings.Contains(err.Error(), "failed: Not enough funds") || + strings.Contains(err.Error(), "Error in daemon: Insufficient funds, please deposit additional LBC") { + log.Println("refilling addresses before retrying") err = s.walletSetup() if err != nil { s.grp.Stop() - SendErrorToSlack("Failed to setup the wallet for a refill: %v", err) + SendErrorToSlack("failed to setup the wallet for a refill: %v", err) break } } -- 2.45.3 From 20876362e40ee579b93271111e8645f18fd9af6a Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Thu, 9 Aug 2018 20:59:52 -0400 Subject: [PATCH 4/4] fix bug in prefill --- ytsync/setup.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ytsync/setup.go b/ytsync/setup.go index e154f16..afaa0af 100644 --- a/ytsync/setup.go +++ b/ytsync/setup.go @@ -104,7 +104,7 @@ func (s *Sync) ensureEnoughUTXOs() error { } target := 40 - slack := target - int(float32(0.05)*float32(target)) + slack := int(float32(0.1) * float32(target)) count := 0 for _, utxo := range *utxolist { -- 2.45.3