From 070287716bedfbb5b03ebc6a5d039f467dd98f10 Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Wed, 24 Nov 2021 05:54:08 +0100 Subject: [PATCH] switch from env vars to a config file get rid of stupid stuff simplify S3 configuration split wallets from blockchain.db and use separate S3 store fix bugs --- configs/configs.go | 52 ++++++++++++++++ downloader/downloader.go | 3 + downloader/downloader_test.go | 7 +-- e2e/e2e.sh | 11 ++-- go.mod | 1 + go.sum | 3 + main.go | 79 +++++++++--------------- manager/manager.go | 4 +- manager/s3_storage.go | 98 +++++++++++++++++------------- manager/setup.go | 7 +-- manager/ytsync.go | 5 +- sdk/api.go | 7 +-- shared/shared.go | 16 ----- sources/youtubeVideo.go | 13 ++-- thumbs/uploader.go | 11 ++-- util/archive.go | 109 ++++++++++++++++++++++++++++++++++ util/util.go | 5 +- ytapi/ytapi.go | 6 +- 18 files changed, 282 insertions(+), 155 deletions(-) create mode 100644 configs/configs.go create mode 100644 util/archive.go diff --git a/configs/configs.go b/configs/configs.go new file mode 100644 index 0000000..baad17c --- /dev/null +++ b/configs/configs.go @@ -0,0 +1,52 @@ +package configs + +import ( + "github.com/lbryio/lbry.go/v2/extras/errors" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/tkanos/gonfig" +) + +type S3Configs struct { + ID string `json:"id"` + Secret string `json:"secret"` + Region string `json:"region"` + Bucket string `json:"bucket"` + Endpoint string `json:"endpoint"` +} +type Configs struct { + SlackToken string `json:"slack_token"` + SlackChannel string `json:"slack_channel"` + InternalApisEndpoint string `json:"internal_apis_endpoint"` + InternalApisAuthToken string `json:"internal_apis_auth_token"` + LbrycrdString string `json:"lbrycrd_string"` + WalletS3Config S3Configs `json:"wallet_s3_config"` + BlockchaindbS3Config S3Configs `json:"blockchaindb_s3_config"` + AWSThumbnailsS3Config S3Configs `json:"aws_thumbnails_s3_config"` + ThumbnailsS3Config S3Configs `json:"thumbnails_s3_config"` +} + +var Configuration *Configs + +func Init(configPath string) error { + if Configuration != nil { + return nil + } + c := Configs{} + err := gonfig.GetConf(configPath, &c) + if err != nil { + return errors.Err(err) + } + Configuration = &c + return nil +} + +func (s *S3Configs) GetS3AWSConfig() *aws.Config { + return &aws.Config{ + Credentials: credentials.NewStaticCredentials(s.ID, s.Secret, ""), + Region: &s.Region, + Endpoint: &s.Endpoint, + S3ForcePathStyle: aws.Bool(true), + } +} diff --git a/downloader/downloader.go b/downloader/downloader.go index 0d8dff0..2b7ee06 100644 --- a/downloader/downloader.go +++ b/downloader/downloader.go @@ -36,6 +36,9 @@ func GetPlaylistVideoIDs(channelName string, maxVideos int, stopChan stop.Chan, } videoIDs := make([]string, 0, maxVideos) for i, v := range ids { + if v == "" { + continue + } logrus.Debugf("%d - video id %s", i, v) if i >= maxVideos { break diff --git a/downloader/downloader_test.go b/downloader/downloader_test.go index f4e981a..92e5f13 100644 --- a/downloader/downloader_test.go +++ b/downloader/downloader_test.go @@ -29,12 +29,7 @@ func TestGetVideoInformation(t *testing.T) { } func Test_getUploadTime(t *testing.T) { - configs := sdk.APIConfig{ - YoutubeAPIKey: "", - ApiURL: "https://api.lbry.com", - ApiToken: "Ht4NETrL5oWKyAaZkuSV68BKhtXkiLh5", - HostName: "test", - } + configs := sdk.APIConfig{} got, err := getUploadTime(&configs, "kDGOHNpRjzc", nil, "20060102") assert.NoError(t, err) t.Log(got) diff --git a/e2e/e2e.sh b/e2e/e2e.sh index 467db08..5921a1d 100755 --- a/e2e/e2e.sh +++ b/e2e/e2e.sh @@ -14,11 +14,8 @@ export LOCAL_TMP_DIR="/var/tmp:/var/tmp" touch -a .env && set -o allexport; source ./.env; set +o allexport echo "LOCAL_TMP_DIR=$LOCAL_TMP_DIR" # Compose settings - docker only -export SLACK_CHANNEL="ytsync-travis" -export LBRY_API_TOKEN="ytsyntoken" -export LBRY_WEB_API="http://localhost:15400" export LBRYNET_ADDRESS="http://localhost:15100" -export LBRYCRD_STRING="tcp://lbry:lbry@localhost:15200" +export LBRYCRD_STRING="tcp://lbry:lbry@localhost:15200" #required for supporty export LBRYNET_USE_DOCKER=true export REFLECT_BLOBS=false export CLEAN_ON_STARTUP=true @@ -50,9 +47,9 @@ until curl --output /dev/null --silent --head --fail http://localhost:15400; do done echo "successfully started..." -channelToSync="UCGyoEsIRjmnmzrsB67DhrOA" -channelName=@Alaminemoh11"$(date +%s)" -latestVideoID="ejWF7Jjdgmc" +channelToSync="UCMn-zv1SE-2y6vyewscfFqw" +channelName=@whatever"$(date +%s)" +latestVideoID="yPJgjiMbmX0" #Data Setup for test ./data_setup.sh "$channelName" "$channelToSync" "$latestVideoID" diff --git a/go.mod b/go.mod index 40b2c65..98b24d3 100644 --- a/go.mod +++ b/go.mod @@ -32,6 +32,7 @@ require ( github.com/spf13/cobra v0.0.5 github.com/spf13/pflag v1.0.5 // indirect github.com/stretchr/testify v1.7.0 + github.com/tkanos/gonfig v0.0.0-20210106201359-53e13348de2f github.com/vbauerster/mpb/v7 v7.0.2 google.golang.org/appengine v1.6.5 // indirect gopkg.in/ini.v1 v1.60.2 // indirect diff --git a/go.sum b/go.sum index 3b1c9d2..eee63cf 100644 --- a/go.sum +++ b/go.sum @@ -111,6 +111,7 @@ github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiD github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0= github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q= @@ -488,6 +489,8 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA= +github.com/tkanos/gonfig v0.0.0-20210106201359-53e13348de2f h1:xDFq4NVQD34ekH5UsedBSgfxsBuPU2aZf7v4t0tH2jY= +github.com/tkanos/gonfig v0.0.0-20210106201359-53e13348de2f/go.mod h1:DaZPBuToMc2eezA9R9nDAnmS2RMwL7yEa5YD36ESQdI= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/uber-go/atomic v1.3.2/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g= diff --git a/main.go b/main.go index 9e93c59..b8ff1cf 100644 --- a/main.go +++ b/main.go @@ -7,14 +7,16 @@ import ( "os" "time" - "github.com/lbryio/lbry.go/v2/extras/errors" - "github.com/lbryio/lbry.go/v2/extras/util" + "github.com/lbryio/ytsync/v5/configs" "github.com/lbryio/ytsync/v5/manager" "github.com/lbryio/ytsync/v5/sdk" "github.com/lbryio/ytsync/v5/shared" ytUtils "github.com/lbryio/ytsync/v5/util" - "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/lbryio/lbry.go/v2/extras/errors" + "github.com/lbryio/lbry.go/v2/extras/util" + + "github.com/prometheus/client_golang/prometheus/promhttp" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) @@ -69,10 +71,14 @@ func main() { } func ytSync(cmd *cobra.Command, args []string) { + err := configs.Init("./config.json") + if err != nil { + log.Fatalf("could not parse configuration file: %s", errors.FullTrace(err)) + } var hostname string - slackToken := os.Getenv("SLACK_TOKEN") - if slackToken == "" { - log.Error("A slack token was not present in env vars! Slack messages disabled!") + + if configs.Configuration.SlackToken == "" { + log.Error("A slack token was not present in the config! Slack messages disabled!") } else { var err error hostname, err = os.Hostname() @@ -84,7 +90,7 @@ func ytSync(cmd *cobra.Command, args []string) { hostname = hostname[0:30] } - util.InitSlack(os.Getenv("SLACK_TOKEN"), os.Getenv("SLACK_CHANNEL"), hostname) + util.InitSlack(configs.Configuration.SlackToken, configs.Configuration.SlackChannel, hostname) } if cliFlags.Status != "" && !util.InSlice(cliFlags.Status, shared.SyncStatuses) { @@ -103,68 +109,41 @@ func ytSync(cmd *cobra.Command, args []string) { } cliFlags.MaxVideoLength = time.Duration(maxVideoLength) * time.Hour - apiURL := os.Getenv("LBRY_WEB_API") - apiToken := os.Getenv("LBRY_API_TOKEN") - youtubeAPIKey := os.Getenv("YOUTUBE_API_KEY") - lbrycrdDsn := os.Getenv("LBRYCRD_STRING") - awsS3ID := os.Getenv("AWS_S3_ID") - awsS3Secret := os.Getenv("AWS_S3_SECRET") - awsS3Region := os.Getenv("AWS_S3_REGION") - awsS3Bucket := os.Getenv("AWS_S3_BUCKET") - if apiURL == "" { - log.Errorln("An API URL was not defined. Please set the environment variable LBRY_WEB_API") + if configs.Configuration.InternalApisEndpoint == "" { + log.Errorln("An Internal APIs Endpoint was not defined") return } - if apiToken == "" { - log.Errorln("An API Token was not defined. Please set the environment variable LBRY_API_TOKEN") + if configs.Configuration.InternalApisAuthToken == "" { + log.Errorln("An Internal APIs auth token was not defined") return } - if youtubeAPIKey == "" { - log.Errorln("A Youtube API key was not defined. Please set the environment variable YOUTUBE_API_KEY") + if configs.Configuration.WalletS3Config.ID == "" || configs.Configuration.WalletS3Config.Region == "" || configs.Configuration.WalletS3Config.Bucket == "" || configs.Configuration.WalletS3Config.Secret == "" || configs.Configuration.WalletS3Config.Endpoint == "" { + log.Errorln("Wallet S3 configuration is incomplete") return } - if awsS3ID == "" { - log.Errorln("AWS S3 ID credentials were not defined. Please set the environment variable AWS_S3_ID") + if configs.Configuration.BlockchaindbS3Config.ID == "" || configs.Configuration.BlockchaindbS3Config.Region == "" || configs.Configuration.BlockchaindbS3Config.Bucket == "" || configs.Configuration.BlockchaindbS3Config.Secret == "" || configs.Configuration.BlockchaindbS3Config.Endpoint == "" { + log.Errorln("Blockchain DBs S3 configuration is incomplete") return } - if awsS3Secret == "" { - log.Errorln("AWS S3 Secret credentials were not defined. Please set the environment variable AWS_S3_SECRET") - return - } - if awsS3Region == "" { - log.Errorln("AWS S3 Region was not defined. Please set the environment variable AWS_S3_REGION") - return - } - if awsS3Bucket == "" { - log.Errorln("AWS S3 Bucket was not defined. Please set the environment variable AWS_S3_BUCKET") - return - } - if lbrycrdDsn == "" { - log.Infoln("Using default (local) lbrycrd instance. Set LBRYCRD_STRING if you want to use something else") + if configs.Configuration.LbrycrdString == "" { + log.Infoln("Using default (local) lbrycrd instance. Set lbrycrd_string if you want to use something else") } blobsDir := ytUtils.GetBlobsDir() apiConfig := &sdk.APIConfig{ - YoutubeAPIKey: youtubeAPIKey, - ApiURL: apiURL, - ApiToken: apiToken, - HostName: hostname, - } - awsConfig := &shared.AwsConfigs{ - AwsS3ID: awsS3ID, - AwsS3Secret: awsS3Secret, - AwsS3Region: awsS3Region, - AwsS3Bucket: awsS3Bucket, + ApiURL: configs.Configuration.InternalApisEndpoint, + ApiToken: configs.Configuration.InternalApisAuthToken, + HostName: hostname, } + sm := manager.NewSyncManager( cliFlags, blobsDir, - lbrycrdDsn, - awsConfig, + configs.Configuration.LbrycrdString, apiConfig, ) - err := sm.Start() + err = sm.Start() if err != nil { ytUtils.SendErrorToSlack(errors.FullTrace(err)) } diff --git a/manager/manager.go b/manager/manager.go index e01ffc6..d5abf79 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -24,18 +24,16 @@ type SyncManager struct { CliFlags shared.SyncFlags ApiConfig *sdk.APIConfig LbrycrdDsn string - AwsConfigs *shared.AwsConfigs blobsDir string channelsToSync []Sync } -func NewSyncManager(cliFlags shared.SyncFlags, blobsDir, lbrycrdDsn string, awsConfigs *shared.AwsConfigs, apiConfig *sdk.APIConfig) *SyncManager { +func NewSyncManager(cliFlags shared.SyncFlags, blobsDir, lbrycrdDsn string, apiConfig *sdk.APIConfig) *SyncManager { return &SyncManager{ CliFlags: cliFlags, blobsDir: blobsDir, LbrycrdDsn: lbrycrdDsn, - AwsConfigs: awsConfigs, ApiConfig: apiConfig, } } diff --git a/manager/s3_storage.go b/manager/s3_storage.go index 2b7f6e1..f36e216 100644 --- a/manager/s3_storage.go +++ b/manager/s3_storage.go @@ -3,6 +3,12 @@ package manager import ( "os" "path/filepath" + "strings" + + "github.com/lbryio/ytsync/v5/configs" + "github.com/lbryio/ytsync/v5/util" + + "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" @@ -10,24 +16,21 @@ import ( "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" log "github.com/sirupsen/logrus" - - "github.com/lbryio/lbry.go/v2/extras/errors" - - logUtils "github.com/lbryio/ytsync/v5/util" ) -func (s *Sync) getS3Downloader() (*s3manager.Downloader, error) { - s3Session, err := session.NewSession(s.Manager.AwsConfigs.GetS3AWSConfig()) +func (s *Sync) getS3Downloader(config *aws.Config) (*s3manager.Downloader, error) { + s3Session, err := session.NewSession(config) if err != nil { - return nil, errors.Prefix("error starting session: ", err) + return nil, errors.Prefix("error starting session", err) } downloader := s3manager.NewDownloader(s3Session) return downloader, nil } -func (s *Sync) getS3Uploader() (*s3manager.Uploader, error) { - s3Session, err := session.NewSession(s.Manager.AwsConfigs.GetS3AWSConfig()) + +func (s *Sync) getS3Uploader(config *aws.Config) (*s3manager.Uploader, error) { + s3Session, err := session.NewSession(config) if err != nil { - return nil, errors.Prefix("error starting session: ", err) + return nil, errors.Prefix("error starting session", err) } uploader := s3manager.NewUploader(s3Session) return uploader, nil @@ -38,18 +41,18 @@ func (s *Sync) downloadWallet() error { if err != nil { return errors.Err(err) } - downloader, err := s.getS3Downloader() + downloader, err := s.getS3Downloader(configs.Configuration.WalletS3Config.GetS3AWSConfig()) if err != nil { return err } out, err := os.Create(defaultTempWalletDir) if err != nil { - return errors.Prefix("error creating temp wallet: ", err) + return errors.Prefix("error creating temp wallet", err) } defer out.Close() bytesWritten, err := downloader.Download(out, &s3.GetObjectInput{ - Bucket: aws.String(s.Manager.AwsConfigs.AwsS3Bucket), + Bucket: aws.String(configs.Configuration.WalletS3Config.Bucket), Key: key, }) if err != nil { @@ -74,21 +77,21 @@ func (s *Sync) downloadWallet() error { err = os.Rename(defaultTempWalletDir, defaultWalletDir) if err != nil { - return errors.Prefix("error replacing temp wallet for default wallet: ", err) + return errors.Prefix("error replacing temp wallet for default wallet", err) } return nil } func (s *Sync) downloadBlockchainDB() error { - if logUtils.IsRegTest() { - return nil // tests fail if we re-use the same blockchain DB - } - defaultBDBDir, defaultTempBDBDir, key, err := s.getBlockchainDBPaths() + //if util.IsRegTest() { + // return nil // tests fail if we re-use the same blockchain DB + //} + defaultBDBPath, defaultTempBDBPath, key, err := s.getBlockchainDBPaths() if err != nil { return errors.Err(err) } - files, err := filepath.Glob(defaultBDBDir + "*") + files, err := filepath.Glob(defaultBDBPath + "*") if err != nil { return errors.Err(err) } @@ -101,18 +104,18 @@ func (s *Sync) downloadBlockchainDB() error { if s.DbChannelData.WipeDB { return nil } - downloader, err := s.getS3Downloader() + downloader, err := s.getS3Downloader(configs.Configuration.BlockchaindbS3Config.GetS3AWSConfig()) if err != nil { return errors.Err(err) } - out, err := os.Create(defaultTempBDBDir) + out, err := os.Create(defaultTempBDBPath) if err != nil { - return errors.Prefix("error creating temp wallet: ", err) + return errors.Prefix("error creating temp blockchain DB file", err) } defer out.Close() bytesWritten, err := downloader.Download(out, &s3.GetObjectInput{ - Bucket: aws.String(s.Manager.AwsConfigs.AwsS3Bucket), + Bucket: aws.String(configs.Configuration.BlockchaindbS3Config.Bucket), Key: key, }) if err != nil { @@ -135,11 +138,13 @@ func (s *Sync) downloadBlockchainDB() error { return errors.Err("zero bytes written") } - err = os.Rename(defaultTempBDBDir, defaultBDBDir) + blockchainDbDir := strings.Replace(defaultBDBPath, "blockchain.db", "", -1) + err = util.Untar(defaultTempBDBPath, blockchainDbDir) if err != nil { - return errors.Prefix("error replacing temp blockchain.db for default blockchain.db: ", err) + return errors.Prefix("error extracting blockchain.db files", err) } - log.Printf("blockchain.db downloaded to %s", defaultBDBDir) + + log.Printf("blockchain.db data downloaded and extracted to %s", blockchainDbDir) return nil } @@ -147,7 +152,7 @@ func (s *Sync) getWalletPaths() (defaultWallet, tempWallet string, key *string, defaultWallet = os.Getenv("HOME") + "/.lbryum/wallets/default_wallet" tempWallet = os.Getenv("HOME") + "/.lbryum/wallets/tmp_wallet" key = aws.String("/wallets/" + s.DbChannelData.ChannelId) - if logUtils.IsRegTest() { + if util.IsRegTest() { defaultWallet = os.Getenv("HOME") + "/.lbryum_regtest/wallets/default_wallet" tempWallet = os.Getenv("HOME") + "/.lbryum_regtest/wallets/tmp_wallet" key = aws.String("/regtest/" + s.DbChannelData.ChannelId) @@ -168,27 +173,27 @@ func (s *Sync) getWalletPaths() (defaultWallet, tempWallet string, key *string, func (s *Sync) getBlockchainDBPaths() (defaultDB, tempDB string, key *string, err error) { lbryumDir := os.Getenv("LBRYUM_DIR") if lbryumDir == "" { - if logUtils.IsRegTest() { + if util.IsRegTest() { lbryumDir = os.Getenv("HOME") + "/.lbryum_regtest" } else { lbryumDir = os.Getenv("HOME") + "/.lbryum" } } defaultDB = lbryumDir + "/lbc_mainnet/blockchain.db" - tempDB = lbryumDir + "/lbc_mainnet/tmp_blockchain.db" - key = aws.String("/blockchain_dbs/" + s.DbChannelData.ChannelId) - if logUtils.IsRegTest() { + tempDB = lbryumDir + "/lbc_mainnet/tmp_blockchain.tar" + key = aws.String("/blockchain_dbs/" + s.DbChannelData.ChannelId + ".tar") + if util.IsRegTest() { defaultDB = lbryumDir + "/lbc_regtest/blockchain.db" - tempDB = lbryumDir + "/lbc_regtest/tmp_blockchain.db" - key = aws.String("/regtest_dbs/" + s.DbChannelData.ChannelId) + tempDB = lbryumDir + "/lbc_regtest/tmp_blockchain.tar" + key = aws.String("/regtest_dbs/" + s.DbChannelData.ChannelId + ".tar") } return } func (s *Sync) uploadWallet() error { - defaultWalletDir := logUtils.GetDefaultWalletPath() + defaultWalletDir := util.GetDefaultWalletPath() key := aws.String("/wallets/" + s.DbChannelData.ChannelId) - if logUtils.IsRegTest() { + if util.IsRegTest() { key = aws.String("/regtest/" + s.DbChannelData.ChannelId) } @@ -196,7 +201,7 @@ func (s *Sync) uploadWallet() error { return errors.Err("default_wallet does not exist") } - uploader, err := s.getS3Uploader() + uploader, err := s.getS3Uploader(configs.Configuration.WalletS3Config.GetS3AWSConfig()) if err != nil { return err } @@ -208,7 +213,7 @@ func (s *Sync) uploadWallet() error { defer file.Close() _, err = uploader.Upload(&s3manager.UploadInput{ - Bucket: aws.String(s.Manager.AwsConfigs.AwsS3Bucket), + Bucket: aws.String(configs.Configuration.WalletS3Config.Bucket), Key: key, Body: file, }) @@ -229,26 +234,35 @@ func (s *Sync) uploadBlockchainDB() error { if _, err := os.Stat(defaultBDBDir); os.IsNotExist(err) { return errors.Err("blockchain.db does not exist") } - - uploader, err := s.getS3Uploader() + files, err := filepath.Glob(defaultBDBDir + "*") + if err != nil { + return errors.Err(err) + } + tarPath := strings.Replace(defaultBDBDir, "blockchain.db", "", -1) + s.DbChannelData.ChannelId + ".tar" + err = util.CreateTarball(tarPath, files) if err != nil { return err } - file, err := os.Open(defaultBDBDir) + uploader, err := s.getS3Uploader(configs.Configuration.BlockchaindbS3Config.GetS3AWSConfig()) + if err != nil { + return err + } + + file, err := os.Open(tarPath) if err != nil { return err } defer file.Close() _, err = uploader.Upload(&s3manager.UploadInput{ - Bucket: aws.String(s.Manager.AwsConfigs.AwsS3Bucket), + Bucket: aws.String(configs.Configuration.BlockchaindbS3Config.Bucket), Key: key, Body: file, }) if err != nil { return err } - log.Println("blockchain.db uploaded to S3") + log.Println("blockchain.db files uploaded to S3") return os.Remove(defaultBDBDir) } diff --git a/manager/setup.go b/manager/setup.go index cfb1670..d606707 100644 --- a/manager/setup.go +++ b/manager/setup.go @@ -311,12 +311,12 @@ func (s *Sync) waitForNewBlock() error { func (s *Sync) GenerateRegtestBlock() error { lbrycrd, err := logUtils.GetLbrycrdClient(s.Manager.LbrycrdDsn) if err != nil { - return errors.Prefix("error getting lbrycrd client: ", err) + return errors.Prefix("error getting lbrycrd client", err) } txs, err := lbrycrd.Generate(1) if err != nil { - return errors.Prefix("error generating new block: ", err) + return errors.Prefix("error generating new block", err) } for _, tx := range txs { @@ -407,7 +407,7 @@ func (s *Sync) ensureChannelOwnership() error { } thumbnail := channelInfo.Header.C4TabbedHeaderRenderer.Avatar.Thumbnails[len(channelInfo.Header.C4TabbedHeaderRenderer.Avatar.Thumbnails)-1].URL - thumbnailURL, err := thumbs.MirrorThumbnail(thumbnail, s.DbChannelData.ChannelId, *s.Manager.AwsConfigs.GetS3AWSConfig()) + thumbnailURL, err := thumbs.MirrorThumbnail(thumbnail, s.DbChannelData.ChannelId) if err != nil { return err } @@ -416,7 +416,6 @@ func (s *Sync) ensureChannelOwnership() error { if channelInfo.Header.C4TabbedHeaderRenderer.Banner.Thumbnails != nil { bURL, err := thumbs.MirrorThumbnail(channelInfo.Header.C4TabbedHeaderRenderer.Banner.Thumbnails[len(channelInfo.Header.C4TabbedHeaderRenderer.Banner.Thumbnails)-1].URL, "banner-"+s.DbChannelData.ChannelId, - *s.Manager.AwsConfigs.GetS3AWSConfig(), ) if err != nil { return err diff --git a/manager/ytsync.go b/manager/ytsync.go index 04cae59..4ac42b3 100644 --- a/manager/ytsync.go +++ b/manager/ytsync.go @@ -358,9 +358,9 @@ func (s *Sync) stopAndUploadWallet(e *error) { err = s.uploadBlockchainDB() if err != nil { if *e == nil { - e = &err + *e = err } else { - *e = errors.Prefix("failure uploading wallet", *e) + *e = errors.Prefix(fmt.Sprintf("failure uploading blockchain DB: %s + original error", errors.FullTrace(err)), *e) } } } @@ -863,7 +863,6 @@ func (s *Sync) enqueueYoutubeVideos() error { videos, err := ytapi.GetVideosToSync(s.Manager.ApiConfig, s.DbChannelData.ChannelId, s.syncedVideos, s.Manager.CliFlags.QuickSync, s.Manager.CliFlags.VideosToSync(s.DbChannelData.TotalSubscribers), ytapi.VideoParams{ VideoDir: s.videoDirectory, - S3Config: *s.Manager.AwsConfigs.GetS3AWSConfig(), Stopper: s.grp, IPPool: ipPool, }, s.DbChannelData.LastUploadedVideo) diff --git a/sdk/api.go b/sdk/api.go index f42d298..3314825 100644 --- a/sdk/api.go +++ b/sdk/api.go @@ -25,10 +25,9 @@ const ( ) type APIConfig struct { - YoutubeAPIKey string - ApiURL string - ApiToken string - HostName string + ApiURL string + ApiToken string + HostName string } func (a *APIConfig) FetchChannels(status string, cliFlags *shared.SyncFlags) ([]shared.YoutubeChannel, error) { diff --git a/shared/shared.go b/shared/shared.go index 10474e5..fe9bbb8 100644 --- a/shared/shared.go +++ b/shared/shared.go @@ -4,8 +4,6 @@ import ( "encoding/json" "time" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/credentials" "github.com/lbryio/lbry.go/v2/extras/errors" ) @@ -213,17 +211,3 @@ const ( TransferStateComplete TransferStateManual ) - -type AwsConfigs struct { - AwsS3ID string - AwsS3Secret string - AwsS3Region string - AwsS3Bucket string -} - -func (a *AwsConfigs) GetS3AWSConfig() *aws.Config { - return &aws.Config{ - Credentials: credentials.NewStaticCredentials(a.AwsS3ID, a.AwsS3Secret, ""), - Region: &a.AwsS3Region, - } -} diff --git a/sources/youtubeVideo.go b/sources/youtubeVideo.go index e824013..99c5fc0 100644 --- a/sources/youtubeVideo.go +++ b/sources/youtubeVideo.go @@ -37,7 +37,6 @@ import ( "github.com/lbryio/lbry.go/v2/extras/stop" "github.com/lbryio/lbry.go/v2/extras/util" - "github.com/aws/aws-sdk-go/aws" "github.com/shopspring/decimal" log "github.com/sirupsen/logrus" ) @@ -55,7 +54,6 @@ type YoutubeVideo struct { youtubeInfo *ytdl.YtdlVideo youtubeChannelID string tags []string - awsConfig aws.Config thumbnailURL string lbryChannelID string mocked bool @@ -101,7 +99,7 @@ var youtubeCategories = map[string]string{ "44": "trailers", } -func NewYoutubeVideo(directory string, videoData *ytdl.YtdlVideo, playlistPosition int64, awsConfig aws.Config, stopGroup *stop.Group, pool *ip_manager.IPPool) (*YoutubeVideo, error) { +func NewYoutubeVideo(directory string, videoData *ytdl.YtdlVideo, playlistPosition int64, stopGroup *stop.Group, pool *ip_manager.IPPool) (*YoutubeVideo, error) { // youtube-dl returns times in local timezone sometimes. this could break in the future // maybe we can file a PR to choose the timezone we want from youtube-dl return &YoutubeVideo{ @@ -112,19 +110,18 @@ func NewYoutubeVideo(directory string, videoData *ytdl.YtdlVideo, playlistPositi publishedAt: videoData.UploadDateForReal, dir: directory, youtubeInfo: videoData, - awsConfig: awsConfig, mocked: false, youtubeChannelID: videoData.ChannelID, stopGroup: stopGroup, pool: pool, }, nil } -func NewMockedVideo(directory string, videoID string, youtubeChannelID string, awsConfig aws.Config, stopGroup *stop.Group, pool *ip_manager.IPPool) *YoutubeVideo { + +func NewMockedVideo(directory string, videoID string, youtubeChannelID string, stopGroup *stop.Group, pool *ip_manager.IPPool) *YoutubeVideo { return &YoutubeVideo{ id: videoID, playlistPosition: 0, dir: directory, - awsConfig: awsConfig, mocked: true, youtubeChannelID: youtubeChannelID, stopGroup: stopGroup, @@ -688,7 +685,7 @@ func (v *YoutubeVideo) triggerThumbnailSave() (err error) { if thumbnail.Width == 0 { return errors.Err("default youtube thumbnail found") } - v.thumbnailURL, err = thumbs.MirrorThumbnail(thumbnail.URL, v.ID(), v.awsConfig) + v.thumbnailURL, err = thumbs.MirrorThumbnail(thumbnail.URL, v.ID()) return err } @@ -880,7 +877,7 @@ func (v *YoutubeVideo) reprocess(daemon *jsonrpc.Client, params SyncParams, exis return nil, errors.Err("could not find thumbnail for mocked video") } thumbnail := thumbs.GetBestThumbnail(v.youtubeInfo.Thumbnails) - thumbnailURL, err = thumbs.MirrorThumbnail(thumbnail.URL, v.ID(), v.awsConfig) + thumbnailURL, err = thumbs.MirrorThumbnail(thumbnail.URL, v.ID()) } else { thumbnailURL = thumbs.ThumbnailEndpoint + v.ID() } diff --git a/thumbs/uploader.go b/thumbs/uploader.go index e293a6b..01da615 100644 --- a/thumbs/uploader.go +++ b/thumbs/uploader.go @@ -6,6 +6,7 @@ import ( "os" "strings" + "github.com/lbryio/ytsync/v5/configs" "github.com/lbryio/ytsync/v5/downloader/ytdl" "github.com/lbryio/lbry.go/v2/extras/errors" @@ -83,11 +84,11 @@ func (u *thumbnailUploader) deleteTmpFile() { log.Infof("failed to delete local thumbnail file: %s", err.Error()) } } -func MirrorThumbnail(url string, name string, s3Config aws.Config) (string, error) { +func MirrorThumbnail(url string, name string) (string, error) { tu := thumbnailUploader{ originalUrl: url, name: name, - s3Config: s3Config, + s3Config: *configs.Configuration.AWSThumbnailsS3Config.GetS3AWSConfig(), } err := tu.downloadThumbnail() if err != nil { @@ -100,14 +101,12 @@ func MirrorThumbnail(url string, name string, s3Config aws.Config) (string, erro return "", err } - ownS3Config := s3Config.Copy(&aws.Config{Endpoint: aws.String("s3.lbry.tech")}) - + //this is our own S3 storage tu2 := thumbnailUploader{ originalUrl: url, name: name, - s3Config: *ownS3Config, + s3Config: *configs.Configuration.ThumbnailsS3Config.GetS3AWSConfig(), } - //own S3 err = tu2.uploadThumbnail() if err != nil { return "", err diff --git a/util/archive.go b/util/archive.go new file mode 100644 index 0000000..b18936c --- /dev/null +++ b/util/archive.go @@ -0,0 +1,109 @@ +package util + +import ( + "archive/tar" + "io" + "io/fs" + "os" + "path/filepath" + + "github.com/lbryio/lbry.go/v2/extras/errors" +) + +func CreateTarball(tarballFilePath string, filePaths []string) error { + file, err := os.Create(tarballFilePath) + if err != nil { + return errors.Err("Could not create tarball file '%s', got error '%s'", tarballFilePath, err.Error()) + } + defer file.Close() + + tarWriter := tar.NewWriter(file) + defer tarWriter.Close() + + for _, filePath := range filePaths { + err := addFileToTarWriter(filePath, tarWriter) + if err != nil { + return errors.Err("Could not add file '%s', to tarball, got error '%s'", filePath, err.Error()) + } + } + + return nil +} + +func addFileToTarWriter(filePath string, tarWriter *tar.Writer) error { + file, err := os.Open(filePath) + if err != nil { + return errors.Err("Could not open file '%s', got error '%s'", filePath, err.Error()) + } + defer file.Close() + + stat, err := file.Stat() + if err != nil { + return errors.Err("Could not get stat for file '%s', got error '%s'", filePath, err.Error()) + } + + header := &tar.Header{ + Name: stat.Name(), + Size: stat.Size(), + Mode: int64(stat.Mode()), + ModTime: stat.ModTime(), + } + + err = tarWriter.WriteHeader(header) + if err != nil { + return errors.Err("Could not write header for file '%s', got error '%s'", filePath, err.Error()) + } + + _, err = io.Copy(tarWriter, file) + if err != nil { + return errors.Err("Could not copy the file '%s' data to the tarball, got error '%s'", filePath, err.Error()) + } + + return nil +} + +func Untar(tarball, target string) error { + reader, err := os.Open(tarball) + if err != nil { + return errors.Err(err) + } + defer reader.Close() + tarReader := tar.NewReader(reader) + + for { + header, err := tarReader.Next() + if err == io.EOF { + break + } else if err != nil { + return errors.Err(err) + } + + path := filepath.Join(target, header.Name) + info := header.FileInfo() + if info.IsDir() { + if err = os.MkdirAll(path, info.Mode()); err != nil { + return errors.Err(err) + } + continue + } + + err = extractFile(path, info, tarReader) + if err != nil { + return err + } + } + return nil +} + +func extractFile(path string, info fs.FileInfo, tarReader *tar.Reader) error { + file, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, info.Mode()) + if err != nil { + return errors.Err(err) + } + defer file.Close() + _, err = io.Copy(file, tarReader) + if err != nil { + return errors.Err(err) + } + return nil +} diff --git a/util/util.go b/util/util.go index 5506bc6..2afcb1f 100644 --- a/util/util.go +++ b/util/util.go @@ -11,6 +11,7 @@ import ( "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/lbrycrd" + "github.com/lbryio/ytsync/v5/configs" "github.com/lbryio/ytsync/v5/timing" "github.com/docker/docker/api/types" @@ -185,9 +186,9 @@ func CleanForStartup() error { return errors.Err(err) } - lbrycrd, err := GetLbrycrdClient(os.Getenv("LBRYCRD_STRING")) + lbrycrd, err := GetLbrycrdClient(configs.Configuration.LbrycrdString) if err != nil { - return errors.Prefix("error getting lbrycrd client: ", err) + return errors.Prefix("error getting lbrycrd client", err) } height, err := lbrycrd.GetBlockCount() if err != nil { diff --git a/ytapi/ytapi.go b/ytapi/ytapi.go index 3aba9b1..915e635 100644 --- a/ytapi/ytapi.go +++ b/ytapi/ytapi.go @@ -29,7 +29,6 @@ import ( "github.com/lbryio/lbry.go/v2/extras/stop" "github.com/lbryio/lbry.go/v2/extras/util" - "github.com/aws/aws-sdk-go/aws" log "github.com/sirupsen/logrus" ) @@ -50,7 +49,6 @@ func (a byPublishedAt) Less(i, j int) bool { return a[i].PublishedAt().Before(a[ type VideoParams struct { VideoDir string - S3Config aws.Config Stopper *stop.Group IPPool *ip_manager.IPPool } @@ -103,7 +101,7 @@ func GetVideosToSync(config *sdk.APIConfig, channelID string, syncedVideos map[s for _, item := range vids { positionInList := playlistMap[item.ID] - videoToAdd, err := sources.NewYoutubeVideo(videoParams.VideoDir, item, positionInList, videoParams.S3Config, videoParams.Stopper, videoParams.IPPool) + videoToAdd, err := sources.NewYoutubeVideo(videoParams.VideoDir, item, positionInList, videoParams.Stopper, videoParams.IPPool) if err != nil { return nil, errors.Err(err) } @@ -115,7 +113,7 @@ func GetVideosToSync(config *sdk.APIConfig, channelID string, syncedVideos map[s continue } if _, ok := playlistMap[k]; !ok { - videos = append(videos, sources.NewMockedVideo(videoParams.VideoDir, k, channelID, videoParams.S3Config, videoParams.Stopper, videoParams.IPPool)) + videos = append(videos, sources.NewMockedVideo(videoParams.VideoDir, k, channelID, videoParams.Stopper, videoParams.IPPool)) } }