Merge pull request #34 from lbryio/wallets-refactor

lbrynet/lbrycrd wallet management
This commit is contained in:
Niko 2018-08-20 14:09:05 +02:00 committed by GitHub
commit 9eabd4ad0a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 250 additions and 79 deletions

View file

@ -94,6 +94,11 @@ func ytSync(cmd *cobra.Command, args []string) {
apiToken := os.Getenv("LBRY_API_TOKEN")
youtubeAPIKey := os.Getenv("YOUTUBE_API_KEY")
blobsDir := os.Getenv("BLOBS_DIRECTORY")
lbrycrdString := os.Getenv("LBRYCRD_STRING")
awsS3ID := os.Getenv("AWS_S3_ID")
awsS3Secret := os.Getenv("AWS_S3_SECRET")
awsS3Region := os.Getenv("AWS_S3_REGION")
awsS3Bucket := os.Getenv("AWS_S3_BUCKET")
if apiURL == "" {
log.Errorln("An API URL was not defined. Please set the environment variable LBRY_API")
return
@ -106,6 +111,25 @@ func ytSync(cmd *cobra.Command, args []string) {
log.Errorln("A Youtube API key was not defined. Please set the environment variable YOUTUBE_API_KEY")
return
}
if awsS3ID == "" {
log.Errorln("AWS S3 ID credentials were not defined. Please set the environment variable AWS_S3_ID")
return
}
if awsS3Secret == "" {
log.Errorln("AWS S3 Secret credentials were not defined. Please set the environment variable AWS_S3_SECRET")
return
}
if awsS3Region == "" {
log.Errorln("AWS S3 Region was not defined. Please set the environment variable AWS_S3_REGION")
return
}
if awsS3Bucket == "" {
log.Errorln("AWS S3 Bucket was not defined. Please set the environment variable AWS_S3_BUCKET")
return
}
if lbrycrdString == "" {
log.Infoln("Using default (local) lbrycrd instance. Set LBRYCRD_STRING if you want to use something else")
}
if blobsDir == "" {
usr, err := user.Current()
if err != nil {
@ -136,6 +160,11 @@ func ytSync(cmd *cobra.Command, args []string) {
BlobsDir: blobsDir,
VideosLimit: videosLimit,
MaxVideoSize: maxVideoSize,
LbrycrdString: lbrycrdString,
AwsS3ID: awsS3ID,
AwsS3Secret: awsS3Secret,
AwsS3Region: awsS3Region,
AwsS3Bucket: awsS3Bucket,
}
err := sm.Start()

View file

@ -39,6 +39,11 @@ type SyncManager struct {
BlobsDir string
VideosLimit int
MaxVideoSize int
LbrycrdString string
AwsS3ID string
AwsS3Secret string
AwsS3Region string
AwsS3Bucket string
}
const (
@ -154,6 +159,10 @@ func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status st
vals.Add("claim_name", claimName)
}
if failureReason != "" {
maxReasonLength := 500
if len(failureReason) > maxReasonLength {
failureReason = failureReason[:500]
}
vals.Add("failure_reason", failureReason)
}
res, _ := http.PostForm(endpoint, vals)
@ -212,6 +221,11 @@ func (s SyncManager) Start() error {
TakeOverExistingChannel: s.TakeOverExistingChannel,
Refill: s.Refill,
Manager: &s,
LbrycrdString: s.LbrycrdString,
AwsS3ID: s.AwsS3ID,
AwsS3Secret: s.AwsS3Secret,
AwsS3Region: s.AwsS3Region,
AwsS3Bucket: s.AwsS3Bucket,
}
shouldInterruptLoop = true
} else {
@ -242,6 +256,11 @@ func (s SyncManager) Start() error {
TakeOverExistingChannel: s.TakeOverExistingChannel,
Refill: s.Refill,
Manager: &s,
LbrycrdString: s.LbrycrdString,
AwsS3ID: s.AwsS3ID,
AwsS3Secret: s.AwsS3Secret,
AwsS3Region: s.AwsS3Region,
AwsS3Bucket: s.AwsS3Bucket,
})
}
}

View file

@ -104,6 +104,7 @@ func (s *Sync) ensureEnoughUTXOs() error {
}
target := 40
slack := int(float32(0.1) * float32(target))
count := 0
for _, utxo := range *utxolist {
@ -112,7 +113,7 @@ func (s *Sync) ensureEnoughUTXOs() error {
}
}
if count < target {
if count < target-slack {
newAddresses := target - count
balance, err := s.daemon.WalletBalance()
@ -257,9 +258,18 @@ func allUTXOsConfirmed(utxolist *jsonrpc.UTXOListResponse) bool {
func (s *Sync) addCredits(amountToAdd float64) error {
log.Printf("Adding %f credits", amountToAdd)
lbrycrdd, err := lbrycrd.NewWithDefaultURL()
if err != nil {
return err
var lbrycrdd *lbrycrd.Client
var err error
if s.LbrycrdString == "" {
lbrycrdd, err = lbrycrd.NewWithDefaultURL()
if err != nil {
return err
}
} else {
lbrycrdd, err = lbrycrd.New(s.LbrycrdString)
if err != nil {
return err
}
}
addressResp, err := s.daemon.WalletUnusedAddress()
@ -280,6 +290,4 @@ func (s *Sync) addCredits(amountToAdd float64) error {
time.Sleep(wait)
return nil
//log.Println("Waiting for transaction to be confirmed")
//return s.waitUntilUTXOsConfirmed()
}

View file

@ -17,6 +17,12 @@ import (
"syscall"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/jsonrpc"
"github.com/lbryio/lbry.go/stop"
@ -60,6 +66,11 @@ type Sync struct {
TakeOverExistingChannel bool
Refill int
Manager *SyncManager
LbrycrdString string
AwsS3ID string
AwsS3Secret string
AwsS3Region string
AwsS3Bucket string
daemon *jsonrpc.Client
claimAddress string
@ -114,6 +125,97 @@ func (s *Sync) IsInterrupted() bool {
}
}
func (s *Sync) downloadWallet() error {
defaultWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/default_wallet"
defaultTempWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/tmp_wallet"
key := aws.String("/wallets/" + s.YoutubeChannelID)
if os.Getenv("REGTEST") == "true" {
defaultWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/default_wallet"
defaultTempWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/tmp_wallet"
key = aws.String("/regtest/" + s.YoutubeChannelID)
}
if _, err := os.Stat(defaultWalletDir); !os.IsNotExist(err) {
return errors.Err("default_wallet already exists")
}
creds := credentials.NewStaticCredentials(s.AwsS3ID, s.AwsS3Secret, "")
s3Session, err := session.NewSession(&aws.Config{Region: aws.String(s.AwsS3Region), Credentials: creds})
if err != nil {
return err
}
downloader := s3manager.NewDownloader(s3Session)
out, err := os.Create(defaultTempWalletDir)
if err != nil {
return err
}
defer out.Close()
bytesWritten, err := downloader.Download(out, &s3.GetObjectInput{
Bucket: aws.String(s.AwsS3Bucket),
Key: key,
})
if err != nil {
// Casting to the awserr.Error type will allow you to inspect the error
// code returned by the service in code. The error code can be used
// to switch on context specific functionality. In this case a context
// specific error message is printed to the user based on the bucket
// and key existing.
//
// For information on other S3 API error codes see:
// http://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
if aerr, ok := err.(awserr.Error); ok {
code := aerr.Code()
if code == s3.ErrCodeNoSuchKey {
return errors.Err("wallet not on S3")
}
}
return err
} else if bytesWritten == 0 {
return errors.Err("zero bytes written")
}
return os.Rename(defaultTempWalletDir, defaultWalletDir)
}
func (s *Sync) uploadWallet() error {
defaultWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/default_wallet"
key := aws.String("/wallets/" + s.YoutubeChannelID)
if os.Getenv("REGTEST") == "true" {
defaultWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/default_wallet"
key = aws.String("/regtest/" + s.YoutubeChannelID)
}
if _, err := os.Stat(defaultWalletDir); os.IsNotExist(err) {
return errors.Err("default_wallet does not exist")
}
creds := credentials.NewStaticCredentials(s.AwsS3ID, s.AwsS3Secret, "")
s3Session, err := session.NewSession(&aws.Config{Region: aws.String(s.AwsS3Region), Credentials: creds})
if err != nil {
return err
}
uploader := s3manager.NewUploader(s3Session)
file, err := os.Open(defaultWalletDir)
if err != nil {
return err
}
defer file.Close()
_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(s.AwsS3Bucket),
Key: key,
Body: file,
})
if err != nil {
return err
}
return os.Remove(defaultWalletDir)
}
func (s *Sync) FullCycle() (e error) {
if os.Getenv("HOME") == "" {
return errors.Err("no $HOME env var found")
@ -141,67 +243,18 @@ func (s *Sync) FullCycle() (e error) {
s.syncedVideos = syncedVideos
s.syncedVideosMux.Unlock()
defer func() {
if e != nil {
//conditions for which a channel shouldn't be marked as failed
noFailConditions := []string{
"this youtube channel is being managed by another server",
}
if util.SubstringInSlice(e.Error(), noFailConditions) {
return
}
_, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed)
if err != nil {
msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName)
err = errors.Prefix(msg, err)
e = errors.Prefix(err.Error(), e)
}
} else if !s.IsInterrupted() {
_, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced)
if err != nil {
e = err
}
}
}()
defer s.updateChannelStatus(&e)
defaultWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/default_wallet"
if os.Getenv("REGTEST") == "true" {
defaultWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/default_wallet"
}
walletBackupDir := os.Getenv("HOME") + "/wallets/" + strings.Replace(s.LbryChannelName, "@", "", 1)
if _, err := os.Stat(defaultWalletDir); !os.IsNotExist(err) {
return errors.Err("default_wallet already exists")
}
if _, err = os.Stat(walletBackupDir); !os.IsNotExist(err) {
err = os.Rename(walletBackupDir, defaultWalletDir)
if err != nil {
return errors.Wrap(err, 0)
}
err = s.downloadWallet()
if err != nil && err.Error() != "wallet not on S3" {
return errors.Prefix("failure in downloading wallet: ", err)
} else if err == nil {
log.Println("Continuing previous upload")
} else {
log.Println("Starting new wallet")
}
defer func() {
log.Printf("Stopping daemon")
shutdownErr := stopDaemonViaSystemd()
if shutdownErr != nil {
logShutdownError(shutdownErr)
} else {
// the cli will return long before the daemon effectively stops. we must observe the processes running
// before moving the wallet
waitTimeout := 8 * time.Minute
processDeathError := waitForDaemonProcess(waitTimeout)
if processDeathError != nil {
logShutdownError(processDeathError)
} else {
walletErr := os.Rename(defaultWalletDir, walletBackupDir)
if walletErr != nil {
log.Errorf("error moving wallet to backup dir: %v", walletErr)
}
}
}
}()
defer s.stopAndUploadWallet(&e)
s.videoDirectory, err = ioutil.TempDir("", "ytsync")
if err != nil {
@ -218,18 +271,9 @@ func (s *Sync) FullCycle() (e error) {
s.daemon = jsonrpc.NewClient("")
s.daemon.SetRPCTimeout(40 * time.Minute)
WaitForDaemonStart:
for {
select {
case <-s.grp.Ch():
return nil
default:
_, err := s.daemon.WalletBalance()
if err == nil {
break WaitForDaemonStart
}
time.Sleep(5 * time.Second)
}
err = s.waitForDaemonStart()
if err != nil {
return err
}
err = s.doSync()
@ -244,6 +288,69 @@ WaitForDaemonStart:
return nil
}
func (s *Sync) updateChannelStatus(e *error) {
if *e != nil {
//conditions for which a channel shouldn't be marked as failed
noFailConditions := []string{
"this youtube channel is being managed by another server",
}
if util.SubstringInSlice((*e).Error(), noFailConditions) {
return
}
_, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed)
if err != nil {
msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName)
err = errors.Prefix(msg, err)
*e = errors.Prefix(err.Error(), *e)
}
} else if !s.IsInterrupted() {
_, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced)
if err != nil {
*e = err
}
}
}
func (s *Sync) waitForDaemonStart() error {
for {
select {
case <-s.grp.Ch():
return errors.Err("interrupted during daemon startup")
default:
_, err := s.daemon.WalletBalance()
if err == nil {
return nil
}
time.Sleep(5 * time.Second)
}
}
}
func (s *Sync) stopAndUploadWallet(e *error) {
log.Printf("Stopping daemon")
shutdownErr := stopDaemonViaSystemd()
if shutdownErr != nil {
logShutdownError(shutdownErr)
} else {
// the cli will return long before the daemon effectively stops. we must observe the processes running
// before moving the wallet
waitTimeout := 8 * time.Minute
processDeathError := waitForDaemonProcess(waitTimeout)
if processDeathError != nil {
logShutdownError(processDeathError)
} else {
err := s.uploadWallet()
if err != nil {
if *e == nil {
e = &err
return
} else {
*e = errors.Prefix("failure uploading wallet: ", e)
}
}
}
}
}
func logShutdownError(shutdownErr error) {
SendErrorToSlack("error shutting down daemon: %v", shutdownErr)
SendErrorToSlack("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR")
@ -316,6 +423,7 @@ func (s *Sync) startWorker(workerNum int) {
"no space left on device",
"NotEnoughFunds",
"Cannot publish using channel",
"cannot concatenate 'str' and 'NoneType' objects",
"more than 90% of the space has been used.",
}
if util.SubstringInSlice(err.Error(), fatalErrors) || s.StopOnError {
@ -337,14 +445,21 @@ func (s *Sync) startWorker(workerNum int) {
log.Println("This error should not be retried at all")
} else if tryCount < s.MaxTries {
if strings.Contains(err.Error(), "txn-mempool-conflict") ||
strings.Contains(err.Error(), "failed: Not enough funds") ||
strings.Contains(err.Error(), "Error in daemon: Insufficient funds, please deposit additional LBC") ||
strings.Contains(err.Error(), "too-long-mempool-chain") {
log.Println("waiting for a block and refilling addresses before retrying")
log.Println("waiting for a block before retrying")
err = s.waitForNewBlock()
if err != nil {
s.grp.Stop()
SendErrorToSlack("something went wrong while waiting for a block: %v", err)
break
}
} else if strings.Contains(err.Error(), "failed: Not enough funds") ||
strings.Contains(err.Error(), "Error in daemon: Insufficient funds, please deposit additional LBC") {
log.Println("refilling addresses before retrying")
err = s.walletSetup()
if err != nil {
s.grp.Stop()
SendErrorToSlack("Failed to setup the wallet for a refill: %v", err)
SendErrorToSlack("failed to setup the wallet for a refill: %v", err)
break
}
}