diff --git a/cmd/ytsync.go b/cmd/ytsync.go index d5284bb..35fa489 100644 --- a/cmd/ytsync.go +++ b/cmd/ytsync.go @@ -94,6 +94,11 @@ func ytSync(cmd *cobra.Command, args []string) { apiToken := os.Getenv("LBRY_API_TOKEN") youtubeAPIKey := os.Getenv("YOUTUBE_API_KEY") blobsDir := os.Getenv("BLOBS_DIRECTORY") + lbrycrdString := os.Getenv("LBRYCRD_STRING") + awsS3ID := os.Getenv("AWS_S3_ID") + awsS3Secret := os.Getenv("AWS_S3_SECRET") + awsS3Region := os.Getenv("AWS_S3_REGION") + awsS3Bucket := os.Getenv("AWS_S3_BUCKET") if apiURL == "" { log.Errorln("An API URL was not defined. Please set the environment variable LBRY_API") return @@ -106,6 +111,25 @@ func ytSync(cmd *cobra.Command, args []string) { log.Errorln("A Youtube API key was not defined. Please set the environment variable YOUTUBE_API_KEY") return } + if awsS3ID == "" { + log.Errorln("AWS S3 ID credentials were not defined. Please set the environment variable AWS_S3_ID") + return + } + if awsS3Secret == "" { + log.Errorln("AWS S3 Secret credentials were not defined. Please set the environment variable AWS_S3_SECRET") + return + } + if awsS3Region == "" { + log.Errorln("AWS S3 Region was not defined. Please set the environment variable AWS_S3_REGION") + return + } + if awsS3Bucket == "" { + log.Errorln("AWS S3 Bucket was not defined. Please set the environment variable AWS_S3_BUCKET") + return + } + if lbrycrdString == "" { + log.Infoln("Using default (local) lbrycrd instance. Set LBRYCRD_STRING if you want to use something else") + } if blobsDir == "" { usr, err := user.Current() if err != nil { @@ -136,6 +160,11 @@ func ytSync(cmd *cobra.Command, args []string) { BlobsDir: blobsDir, VideosLimit: videosLimit, MaxVideoSize: maxVideoSize, + LbrycrdString: lbrycrdString, + AwsS3ID: awsS3ID, + AwsS3Secret: awsS3Secret, + AwsS3Region: awsS3Region, + AwsS3Bucket: awsS3Bucket, } err := sm.Start() diff --git a/ytsync/manager.go b/ytsync/manager.go index 9e72d90..0827344 100644 --- a/ytsync/manager.go +++ b/ytsync/manager.go @@ -39,6 +39,11 @@ type SyncManager struct { BlobsDir string VideosLimit int MaxVideoSize int + LbrycrdString string + AwsS3ID string + AwsS3Secret string + AwsS3Region string + AwsS3Bucket string } const ( @@ -154,6 +159,10 @@ func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status st vals.Add("claim_name", claimName) } if failureReason != "" { + maxReasonLength := 500 + if len(failureReason) > maxReasonLength { + failureReason = failureReason[:500] + } vals.Add("failure_reason", failureReason) } res, _ := http.PostForm(endpoint, vals) @@ -212,6 +221,11 @@ func (s SyncManager) Start() error { TakeOverExistingChannel: s.TakeOverExistingChannel, Refill: s.Refill, Manager: &s, + LbrycrdString: s.LbrycrdString, + AwsS3ID: s.AwsS3ID, + AwsS3Secret: s.AwsS3Secret, + AwsS3Region: s.AwsS3Region, + AwsS3Bucket: s.AwsS3Bucket, } shouldInterruptLoop = true } else { @@ -242,6 +256,11 @@ func (s SyncManager) Start() error { TakeOverExistingChannel: s.TakeOverExistingChannel, Refill: s.Refill, Manager: &s, + LbrycrdString: s.LbrycrdString, + AwsS3ID: s.AwsS3ID, + AwsS3Secret: s.AwsS3Secret, + AwsS3Region: s.AwsS3Region, + AwsS3Bucket: s.AwsS3Bucket, }) } } diff --git a/ytsync/setup.go b/ytsync/setup.go index 7a40ccd..afaa0af 100644 --- a/ytsync/setup.go +++ b/ytsync/setup.go @@ -104,6 +104,7 @@ func (s *Sync) ensureEnoughUTXOs() error { } target := 40 + slack := int(float32(0.1) * float32(target)) count := 0 for _, utxo := range *utxolist { @@ -112,7 +113,7 @@ func (s *Sync) ensureEnoughUTXOs() error { } } - if count < target { + if count < target-slack { newAddresses := target - count balance, err := s.daemon.WalletBalance() @@ -257,9 +258,18 @@ func allUTXOsConfirmed(utxolist *jsonrpc.UTXOListResponse) bool { func (s *Sync) addCredits(amountToAdd float64) error { log.Printf("Adding %f credits", amountToAdd) - lbrycrdd, err := lbrycrd.NewWithDefaultURL() - if err != nil { - return err + var lbrycrdd *lbrycrd.Client + var err error + if s.LbrycrdString == "" { + lbrycrdd, err = lbrycrd.NewWithDefaultURL() + if err != nil { + return err + } + } else { + lbrycrdd, err = lbrycrd.New(s.LbrycrdString) + if err != nil { + return err + } } addressResp, err := s.daemon.WalletUnusedAddress() @@ -280,6 +290,4 @@ func (s *Sync) addCredits(amountToAdd float64) error { time.Sleep(wait) return nil - //log.Println("Waiting for transaction to be confirmed") - //return s.waitUntilUTXOsConfirmed() } diff --git a/ytsync/ytsync.go b/ytsync/ytsync.go index c44d9c6..b01e974 100644 --- a/ytsync/ytsync.go +++ b/ytsync/ytsync.go @@ -17,6 +17,12 @@ import ( "syscall" "time" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/jsonrpc" "github.com/lbryio/lbry.go/stop" @@ -60,6 +66,11 @@ type Sync struct { TakeOverExistingChannel bool Refill int Manager *SyncManager + LbrycrdString string + AwsS3ID string + AwsS3Secret string + AwsS3Region string + AwsS3Bucket string daemon *jsonrpc.Client claimAddress string @@ -114,6 +125,97 @@ func (s *Sync) IsInterrupted() bool { } } +func (s *Sync) downloadWallet() error { + defaultWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/default_wallet" + defaultTempWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/tmp_wallet" + key := aws.String("/wallets/" + s.YoutubeChannelID) + if os.Getenv("REGTEST") == "true" { + defaultWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/default_wallet" + defaultTempWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/tmp_wallet" + key = aws.String("/regtest/" + s.YoutubeChannelID) + } + + if _, err := os.Stat(defaultWalletDir); !os.IsNotExist(err) { + return errors.Err("default_wallet already exists") + } + + creds := credentials.NewStaticCredentials(s.AwsS3ID, s.AwsS3Secret, "") + s3Session, err := session.NewSession(&aws.Config{Region: aws.String(s.AwsS3Region), Credentials: creds}) + if err != nil { + return err + } + downloader := s3manager.NewDownloader(s3Session) + out, err := os.Create(defaultTempWalletDir) + if err != nil { + return err + } + defer out.Close() + + bytesWritten, err := downloader.Download(out, &s3.GetObjectInput{ + Bucket: aws.String(s.AwsS3Bucket), + Key: key, + }) + if err != nil { + // Casting to the awserr.Error type will allow you to inspect the error + // code returned by the service in code. The error code can be used + // to switch on context specific functionality. In this case a context + // specific error message is printed to the user based on the bucket + // and key existing. + // + // For information on other S3 API error codes see: + // http://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html + if aerr, ok := err.(awserr.Error); ok { + code := aerr.Code() + if code == s3.ErrCodeNoSuchKey { + return errors.Err("wallet not on S3") + } + } + return err + } else if bytesWritten == 0 { + return errors.Err("zero bytes written") + } + + return os.Rename(defaultTempWalletDir, defaultWalletDir) +} + +func (s *Sync) uploadWallet() error { + defaultWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/default_wallet" + key := aws.String("/wallets/" + s.YoutubeChannelID) + if os.Getenv("REGTEST") == "true" { + defaultWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/default_wallet" + key = aws.String("/regtest/" + s.YoutubeChannelID) + } + + if _, err := os.Stat(defaultWalletDir); os.IsNotExist(err) { + return errors.Err("default_wallet does not exist") + } + + creds := credentials.NewStaticCredentials(s.AwsS3ID, s.AwsS3Secret, "") + s3Session, err := session.NewSession(&aws.Config{Region: aws.String(s.AwsS3Region), Credentials: creds}) + if err != nil { + return err + } + + uploader := s3manager.NewUploader(s3Session) + + file, err := os.Open(defaultWalletDir) + if err != nil { + return err + } + defer file.Close() + + _, err = uploader.Upload(&s3manager.UploadInput{ + Bucket: aws.String(s.AwsS3Bucket), + Key: key, + Body: file, + }) + if err != nil { + return err + } + + return os.Remove(defaultWalletDir) +} + func (s *Sync) FullCycle() (e error) { if os.Getenv("HOME") == "" { return errors.Err("no $HOME env var found") @@ -141,67 +243,18 @@ func (s *Sync) FullCycle() (e error) { s.syncedVideos = syncedVideos s.syncedVideosMux.Unlock() - defer func() { - if e != nil { - //conditions for which a channel shouldn't be marked as failed - noFailConditions := []string{ - "this youtube channel is being managed by another server", - } - if util.SubstringInSlice(e.Error(), noFailConditions) { - return - } - _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed) - if err != nil { - msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName) - err = errors.Prefix(msg, err) - e = errors.Prefix(err.Error(), e) - } - } else if !s.IsInterrupted() { - _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced) - if err != nil { - e = err - } - } - }() + defer s.updateChannelStatus(&e) - defaultWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/default_wallet" - if os.Getenv("REGTEST") == "true" { - defaultWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/default_wallet" - } - walletBackupDir := os.Getenv("HOME") + "/wallets/" + strings.Replace(s.LbryChannelName, "@", "", 1) - - if _, err := os.Stat(defaultWalletDir); !os.IsNotExist(err) { - return errors.Err("default_wallet already exists") - } - - if _, err = os.Stat(walletBackupDir); !os.IsNotExist(err) { - err = os.Rename(walletBackupDir, defaultWalletDir) - if err != nil { - return errors.Wrap(err, 0) - } + err = s.downloadWallet() + if err != nil && err.Error() != "wallet not on S3" { + return errors.Prefix("failure in downloading wallet: ", err) + } else if err == nil { log.Println("Continuing previous upload") + } else { + log.Println("Starting new wallet") } - defer func() { - log.Printf("Stopping daemon") - shutdownErr := stopDaemonViaSystemd() - if shutdownErr != nil { - logShutdownError(shutdownErr) - } else { - // the cli will return long before the daemon effectively stops. we must observe the processes running - // before moving the wallet - waitTimeout := 8 * time.Minute - processDeathError := waitForDaemonProcess(waitTimeout) - if processDeathError != nil { - logShutdownError(processDeathError) - } else { - walletErr := os.Rename(defaultWalletDir, walletBackupDir) - if walletErr != nil { - log.Errorf("error moving wallet to backup dir: %v", walletErr) - } - } - } - }() + defer s.stopAndUploadWallet(&e) s.videoDirectory, err = ioutil.TempDir("", "ytsync") if err != nil { @@ -218,18 +271,9 @@ func (s *Sync) FullCycle() (e error) { s.daemon = jsonrpc.NewClient("") s.daemon.SetRPCTimeout(40 * time.Minute) -WaitForDaemonStart: - for { - select { - case <-s.grp.Ch(): - return nil - default: - _, err := s.daemon.WalletBalance() - if err == nil { - break WaitForDaemonStart - } - time.Sleep(5 * time.Second) - } + err = s.waitForDaemonStart() + if err != nil { + return err } err = s.doSync() @@ -244,6 +288,69 @@ WaitForDaemonStart: return nil } +func (s *Sync) updateChannelStatus(e *error) { + if *e != nil { + //conditions for which a channel shouldn't be marked as failed + noFailConditions := []string{ + "this youtube channel is being managed by another server", + } + if util.SubstringInSlice((*e).Error(), noFailConditions) { + return + } + _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed) + if err != nil { + msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName) + err = errors.Prefix(msg, err) + *e = errors.Prefix(err.Error(), *e) + } + } else if !s.IsInterrupted() { + _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced) + if err != nil { + *e = err + } + } +} + +func (s *Sync) waitForDaemonStart() error { + + for { + select { + case <-s.grp.Ch(): + return errors.Err("interrupted during daemon startup") + default: + _, err := s.daemon.WalletBalance() + if err == nil { + return nil + } + time.Sleep(5 * time.Second) + } + } +} +func (s *Sync) stopAndUploadWallet(e *error) { + log.Printf("Stopping daemon") + shutdownErr := stopDaemonViaSystemd() + if shutdownErr != nil { + logShutdownError(shutdownErr) + } else { + // the cli will return long before the daemon effectively stops. we must observe the processes running + // before moving the wallet + waitTimeout := 8 * time.Minute + processDeathError := waitForDaemonProcess(waitTimeout) + if processDeathError != nil { + logShutdownError(processDeathError) + } else { + err := s.uploadWallet() + if err != nil { + if *e == nil { + e = &err + return + } else { + *e = errors.Prefix("failure uploading wallet: ", e) + } + } + } + } +} func logShutdownError(shutdownErr error) { SendErrorToSlack("error shutting down daemon: %v", shutdownErr) SendErrorToSlack("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR") @@ -316,6 +423,7 @@ func (s *Sync) startWorker(workerNum int) { "no space left on device", "NotEnoughFunds", "Cannot publish using channel", + "cannot concatenate 'str' and 'NoneType' objects", "more than 90% of the space has been used.", } if util.SubstringInSlice(err.Error(), fatalErrors) || s.StopOnError { @@ -337,14 +445,21 @@ func (s *Sync) startWorker(workerNum int) { log.Println("This error should not be retried at all") } else if tryCount < s.MaxTries { if strings.Contains(err.Error(), "txn-mempool-conflict") || - strings.Contains(err.Error(), "failed: Not enough funds") || - strings.Contains(err.Error(), "Error in daemon: Insufficient funds, please deposit additional LBC") || strings.Contains(err.Error(), "too-long-mempool-chain") { - log.Println("waiting for a block and refilling addresses before retrying") + log.Println("waiting for a block before retrying") + err = s.waitForNewBlock() + if err != nil { + s.grp.Stop() + SendErrorToSlack("something went wrong while waiting for a block: %v", err) + break + } + } else if strings.Contains(err.Error(), "failed: Not enough funds") || + strings.Contains(err.Error(), "Error in daemon: Insufficient funds, please deposit additional LBC") { + log.Println("refilling addresses before retrying") err = s.walletSetup() if err != nil { s.grp.Stop() - SendErrorToSlack("Failed to setup the wallet for a refill: %v", err) + SendErrorToSlack("failed to setup the wallet for a refill: %v", err) break } }