lbrynet/lbrycrd wallet management #34

Merged
nikooo777 merged 4 commits from wallets-refactor into master 2018-08-20 14:09:06 +02:00
4 changed files with 250 additions and 79 deletions

View file

@ -94,6 +94,11 @@ func ytSync(cmd *cobra.Command, args []string) {
apiToken := os.Getenv("LBRY_API_TOKEN") apiToken := os.Getenv("LBRY_API_TOKEN")
youtubeAPIKey := os.Getenv("YOUTUBE_API_KEY") youtubeAPIKey := os.Getenv("YOUTUBE_API_KEY")
blobsDir := os.Getenv("BLOBS_DIRECTORY") blobsDir := os.Getenv("BLOBS_DIRECTORY")
lbrycrdString := os.Getenv("LBRYCRD_STRING")
awsS3ID := os.Getenv("AWS_S3_ID")
awsS3Secret := os.Getenv("AWS_S3_SECRET")
awsS3Region := os.Getenv("AWS_S3_REGION")
awsS3Bucket := os.Getenv("AWS_S3_BUCKET")
if apiURL == "" { if apiURL == "" {
log.Errorln("An API URL was not defined. Please set the environment variable LBRY_API") log.Errorln("An API URL was not defined. Please set the environment variable LBRY_API")
return return
@ -106,6 +111,25 @@ func ytSync(cmd *cobra.Command, args []string) {
log.Errorln("A Youtube API key was not defined. Please set the environment variable YOUTUBE_API_KEY") log.Errorln("A Youtube API key was not defined. Please set the environment variable YOUTUBE_API_KEY")
return return
} }
if awsS3ID == "" {
log.Errorln("AWS S3 ID credentials were not defined. Please set the environment variable AWS_S3_ID")
return
}
if awsS3Secret == "" {
log.Errorln("AWS S3 Secret credentials were not defined. Please set the environment variable AWS_S3_SECRET")
return
}
if awsS3Region == "" {
log.Errorln("AWS S3 Region was not defined. Please set the environment variable AWS_S3_REGION")
return
}
if awsS3Bucket == "" {
log.Errorln("AWS S3 Bucket was not defined. Please set the environment variable AWS_S3_BUCKET")
return
}
if lbrycrdString == "" {
log.Infoln("Using default (local) lbrycrd instance. Set LBRYCRD_STRING if you want to use something else")
}
if blobsDir == "" { if blobsDir == "" {
usr, err := user.Current() usr, err := user.Current()
if err != nil { if err != nil {
@ -136,6 +160,11 @@ func ytSync(cmd *cobra.Command, args []string) {
BlobsDir: blobsDir, BlobsDir: blobsDir,
VideosLimit: videosLimit, VideosLimit: videosLimit,
MaxVideoSize: maxVideoSize, MaxVideoSize: maxVideoSize,
LbrycrdString: lbrycrdString,
AwsS3ID: awsS3ID,
AwsS3Secret: awsS3Secret,
AwsS3Region: awsS3Region,
AwsS3Bucket: awsS3Bucket,
} }
err := sm.Start() err := sm.Start()

View file

@ -39,6 +39,11 @@ type SyncManager struct {
BlobsDir string BlobsDir string
VideosLimit int VideosLimit int
MaxVideoSize int MaxVideoSize int
LbrycrdString string
AwsS3ID string
AwsS3Secret string
AwsS3Region string
AwsS3Bucket string
} }
const ( const (
@ -154,6 +159,10 @@ func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status st
vals.Add("claim_name", claimName) vals.Add("claim_name", claimName)
} }
if failureReason != "" { if failureReason != "" {
maxReasonLength := 500
if len(failureReason) > maxReasonLength {
failureReason = failureReason[:500]
}
vals.Add("failure_reason", failureReason) vals.Add("failure_reason", failureReason)
} }
res, _ := http.PostForm(endpoint, vals) res, _ := http.PostForm(endpoint, vals)
@ -212,6 +221,11 @@ func (s SyncManager) Start() error {
TakeOverExistingChannel: s.TakeOverExistingChannel, TakeOverExistingChannel: s.TakeOverExistingChannel,
Refill: s.Refill, Refill: s.Refill,
Manager: &s, Manager: &s,
LbrycrdString: s.LbrycrdString,
AwsS3ID: s.AwsS3ID,
AwsS3Secret: s.AwsS3Secret,
AwsS3Region: s.AwsS3Region,
AwsS3Bucket: s.AwsS3Bucket,
} }
shouldInterruptLoop = true shouldInterruptLoop = true
} else { } else {
@ -242,6 +256,11 @@ func (s SyncManager) Start() error {
TakeOverExistingChannel: s.TakeOverExistingChannel, TakeOverExistingChannel: s.TakeOverExistingChannel,
Refill: s.Refill, Refill: s.Refill,
Manager: &s, Manager: &s,
LbrycrdString: s.LbrycrdString,
AwsS3ID: s.AwsS3ID,
AwsS3Secret: s.AwsS3Secret,
AwsS3Region: s.AwsS3Region,
AwsS3Bucket: s.AwsS3Bucket,
}) })
} }
} }

View file

@ -104,6 +104,7 @@ func (s *Sync) ensureEnoughUTXOs() error {
} }
target := 40 target := 40
slack := int(float32(0.1) * float32(target))
count := 0 count := 0
for _, utxo := range *utxolist { for _, utxo := range *utxolist {
@ -112,7 +113,7 @@ func (s *Sync) ensureEnoughUTXOs() error {
} }
} }
if count < target { if count < target-slack {
newAddresses := target - count newAddresses := target - count
balance, err := s.daemon.WalletBalance() balance, err := s.daemon.WalletBalance()
@ -257,9 +258,18 @@ func allUTXOsConfirmed(utxolist *jsonrpc.UTXOListResponse) bool {
func (s *Sync) addCredits(amountToAdd float64) error { func (s *Sync) addCredits(amountToAdd float64) error {
log.Printf("Adding %f credits", amountToAdd) log.Printf("Adding %f credits", amountToAdd)
lbrycrdd, err := lbrycrd.NewWithDefaultURL() var lbrycrdd *lbrycrd.Client
if err != nil { var err error
return err if s.LbrycrdString == "" {
lbrycrdd, err = lbrycrd.NewWithDefaultURL()
if err != nil {
return err
}
} else {
lbrycrdd, err = lbrycrd.New(s.LbrycrdString)
if err != nil {
return err
}
} }
addressResp, err := s.daemon.WalletUnusedAddress() addressResp, err := s.daemon.WalletUnusedAddress()
@ -280,6 +290,4 @@ func (s *Sync) addCredits(amountToAdd float64) error {
time.Sleep(wait) time.Sleep(wait)
return nil return nil
//log.Println("Waiting for transaction to be confirmed")
//return s.waitUntilUTXOsConfirmed()
} }

View file

@ -17,6 +17,12 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/jsonrpc" "github.com/lbryio/lbry.go/jsonrpc"
"github.com/lbryio/lbry.go/stop" "github.com/lbryio/lbry.go/stop"
@ -60,6 +66,11 @@ type Sync struct {
TakeOverExistingChannel bool TakeOverExistingChannel bool
Refill int Refill int
Manager *SyncManager Manager *SyncManager
LbrycrdString string
AwsS3ID string
AwsS3Secret string
AwsS3Region string
AwsS3Bucket string
daemon *jsonrpc.Client daemon *jsonrpc.Client
claimAddress string claimAddress string
@ -114,6 +125,97 @@ func (s *Sync) IsInterrupted() bool {
} }
} }
func (s *Sync) downloadWallet() error {
defaultWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/default_wallet"
defaultTempWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/tmp_wallet"
key := aws.String("/wallets/" + s.YoutubeChannelID)
if os.Getenv("REGTEST") == "true" {
defaultWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/default_wallet"
defaultTempWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/tmp_wallet"
key = aws.String("/regtest/" + s.YoutubeChannelID)
}
if _, err := os.Stat(defaultWalletDir); !os.IsNotExist(err) {
return errors.Err("default_wallet already exists")
}
creds := credentials.NewStaticCredentials(s.AwsS3ID, s.AwsS3Secret, "")
s3Session, err := session.NewSession(&aws.Config{Region: aws.String(s.AwsS3Region), Credentials: creds})
if err != nil {
return err
}
downloader := s3manager.NewDownloader(s3Session)
out, err := os.Create(defaultTempWalletDir)
if err != nil {
return err
}
defer out.Close()
bytesWritten, err := downloader.Download(out, &s3.GetObjectInput{
Bucket: aws.String(s.AwsS3Bucket),
Key: key,
})
if err != nil {
// Casting to the awserr.Error type will allow you to inspect the error
// code returned by the service in code. The error code can be used
// to switch on context specific functionality. In this case a context
// specific error message is printed to the user based on the bucket
// and key existing.
//
// For information on other S3 API error codes see:
// http://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
if aerr, ok := err.(awserr.Error); ok {
code := aerr.Code()
if code == s3.ErrCodeNoSuchKey {
return errors.Err("wallet not on S3")
}
}
return err
} else if bytesWritten == 0 {
return errors.Err("zero bytes written")
}
return os.Rename(defaultTempWalletDir, defaultWalletDir)
}
func (s *Sync) uploadWallet() error {
defaultWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/default_wallet"
key := aws.String("/wallets/" + s.YoutubeChannelID)
if os.Getenv("REGTEST") == "true" {
defaultWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/default_wallet"
key = aws.String("/regtest/" + s.YoutubeChannelID)
}
if _, err := os.Stat(defaultWalletDir); os.IsNotExist(err) {
return errors.Err("default_wallet does not exist")
}
creds := credentials.NewStaticCredentials(s.AwsS3ID, s.AwsS3Secret, "")
s3Session, err := session.NewSession(&aws.Config{Region: aws.String(s.AwsS3Region), Credentials: creds})
if err != nil {
return err
}
uploader := s3manager.NewUploader(s3Session)
file, err := os.Open(defaultWalletDir)
if err != nil {
return err
}
defer file.Close()
_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(s.AwsS3Bucket),
Key: key,
Body: file,
})
if err != nil {
return err
}
return os.Remove(defaultWalletDir)
}
func (s *Sync) FullCycle() (e error) { func (s *Sync) FullCycle() (e error) {
if os.Getenv("HOME") == "" { if os.Getenv("HOME") == "" {
return errors.Err("no $HOME env var found") return errors.Err("no $HOME env var found")
@ -141,67 +243,18 @@ func (s *Sync) FullCycle() (e error) {
s.syncedVideos = syncedVideos s.syncedVideos = syncedVideos
s.syncedVideosMux.Unlock() s.syncedVideosMux.Unlock()
defer func() { defer s.updateChannelStatus(&e)
if e != nil {
//conditions for which a channel shouldn't be marked as failed
noFailConditions := []string{
"this youtube channel is being managed by another server",
}
if util.SubstringInSlice(e.Error(), noFailConditions) {
return
}
_, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed)
if err != nil {
msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName)
err = errors.Prefix(msg, err)
e = errors.Prefix(err.Error(), e)
}
} else if !s.IsInterrupted() {
_, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced)
if err != nil {
e = err
}
}
}()
defaultWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/default_wallet" err = s.downloadWallet()
if os.Getenv("REGTEST") == "true" { if err != nil && err.Error() != "wallet not on S3" {
defaultWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/default_wallet" return errors.Prefix("failure in downloading wallet: ", err)
} } else if err == nil {
walletBackupDir := os.Getenv("HOME") + "/wallets/" + strings.Replace(s.LbryChannelName, "@", "", 1)
if _, err := os.Stat(defaultWalletDir); !os.IsNotExist(err) {
return errors.Err("default_wallet already exists")
}
if _, err = os.Stat(walletBackupDir); !os.IsNotExist(err) {
err = os.Rename(walletBackupDir, defaultWalletDir)
if err != nil {
return errors.Wrap(err, 0)
}
log.Println("Continuing previous upload") log.Println("Continuing previous upload")
} else {
log.Println("Starting new wallet")
} }
defer func() { defer s.stopAndUploadWallet(&e)
log.Printf("Stopping daemon")
shutdownErr := stopDaemonViaSystemd()
if shutdownErr != nil {
logShutdownError(shutdownErr)
} else {
// the cli will return long before the daemon effectively stops. we must observe the processes running
// before moving the wallet
waitTimeout := 8 * time.Minute
processDeathError := waitForDaemonProcess(waitTimeout)
if processDeathError != nil {
logShutdownError(processDeathError)
} else {
walletErr := os.Rename(defaultWalletDir, walletBackupDir)
if walletErr != nil {
log.Errorf("error moving wallet to backup dir: %v", walletErr)
}
}
}
}()
s.videoDirectory, err = ioutil.TempDir("", "ytsync") s.videoDirectory, err = ioutil.TempDir("", "ytsync")
if err != nil { if err != nil {
@ -218,18 +271,9 @@ func (s *Sync) FullCycle() (e error) {
s.daemon = jsonrpc.NewClient("") s.daemon = jsonrpc.NewClient("")
s.daemon.SetRPCTimeout(40 * time.Minute) s.daemon.SetRPCTimeout(40 * time.Minute)
WaitForDaemonStart: err = s.waitForDaemonStart()
for { if err != nil {
select { return err
case <-s.grp.Ch():
return nil
default:
_, err := s.daemon.WalletBalance()
if err == nil {
break WaitForDaemonStart
}
time.Sleep(5 * time.Second)
}
} }
err = s.doSync() err = s.doSync()
@ -244,6 +288,69 @@ WaitForDaemonStart:
return nil return nil
} }
func (s *Sync) updateChannelStatus(e *error) {
if *e != nil {
//conditions for which a channel shouldn't be marked as failed
noFailConditions := []string{
"this youtube channel is being managed by another server",
}
if util.SubstringInSlice((*e).Error(), noFailConditions) {
return
}
_, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed)
if err != nil {
msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName)
err = errors.Prefix(msg, err)
*e = errors.Prefix(err.Error(), *e)
}
} else if !s.IsInterrupted() {
_, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced)
if err != nil {
*e = err
}
}
}
func (s *Sync) waitForDaemonStart() error {
for {
select {
case <-s.grp.Ch():
return errors.Err("interrupted during daemon startup")
default:
_, err := s.daemon.WalletBalance()
if err == nil {
return nil
}
time.Sleep(5 * time.Second)
}
}
}
func (s *Sync) stopAndUploadWallet(e *error) {
log.Printf("Stopping daemon")
shutdownErr := stopDaemonViaSystemd()
if shutdownErr != nil {
logShutdownError(shutdownErr)
} else {
// the cli will return long before the daemon effectively stops. we must observe the processes running
// before moving the wallet
waitTimeout := 8 * time.Minute
processDeathError := waitForDaemonProcess(waitTimeout)
if processDeathError != nil {
logShutdownError(processDeathError)
} else {
err := s.uploadWallet()
if err != nil {
if *e == nil {
e = &err
return
} else {
*e = errors.Prefix("failure uploading wallet: ", e)
}
}
}
}
}
func logShutdownError(shutdownErr error) { func logShutdownError(shutdownErr error) {
SendErrorToSlack("error shutting down daemon: %v", shutdownErr) SendErrorToSlack("error shutting down daemon: %v", shutdownErr)
SendErrorToSlack("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR") SendErrorToSlack("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR")
@ -316,6 +423,7 @@ func (s *Sync) startWorker(workerNum int) {
"no space left on device", "no space left on device",
"NotEnoughFunds", "NotEnoughFunds",
"Cannot publish using channel", "Cannot publish using channel",
"cannot concatenate 'str' and 'NoneType' objects",
"more than 90% of the space has been used.", "more than 90% of the space has been used.",
} }
if util.SubstringInSlice(err.Error(), fatalErrors) || s.StopOnError { if util.SubstringInSlice(err.Error(), fatalErrors) || s.StopOnError {
@ -337,14 +445,21 @@ func (s *Sync) startWorker(workerNum int) {
log.Println("This error should not be retried at all") log.Println("This error should not be retried at all")
} else if tryCount < s.MaxTries { } else if tryCount < s.MaxTries {
if strings.Contains(err.Error(), "txn-mempool-conflict") || if strings.Contains(err.Error(), "txn-mempool-conflict") ||
strings.Contains(err.Error(), "failed: Not enough funds") ||
strings.Contains(err.Error(), "Error in daemon: Insufficient funds, please deposit additional LBC") ||
strings.Contains(err.Error(), "too-long-mempool-chain") { strings.Contains(err.Error(), "too-long-mempool-chain") {
log.Println("waiting for a block and refilling addresses before retrying") log.Println("waiting for a block before retrying")
err = s.waitForNewBlock()
if err != nil {
s.grp.Stop()
SendErrorToSlack("something went wrong while waiting for a block: %v", err)
break
}
} else if strings.Contains(err.Error(), "failed: Not enough funds") ||
strings.Contains(err.Error(), "Error in daemon: Insufficient funds, please deposit additional LBC") {
log.Println("refilling addresses before retrying")
err = s.walletSetup() err = s.walletSetup()
if err != nil { if err != nil {
s.grp.Stop() s.grp.Stop()
SendErrorToSlack("Failed to setup the wallet for a refill: %v", err) SendErrorToSlack("failed to setup the wallet for a refill: %v", err)
break break
} }
} }