switch from env vars to a config file

get rid of stupid stuff
simplify S3 configuration
split wallets from blockchain.db and use separate S3 store
fix bugs
This commit is contained in:
Niko Storni 2021-11-24 05:54:08 +01:00
parent 41054e77a6
commit 070287716b
18 changed files with 282 additions and 155 deletions

52
configs/configs.go Normal file
View file

@ -0,0 +1,52 @@
package configs
import (
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/tkanos/gonfig"
)
type S3Configs struct {
ID string `json:"id"`
Secret string `json:"secret"`
Region string `json:"region"`
Bucket string `json:"bucket"`
Endpoint string `json:"endpoint"`
}
type Configs struct {
SlackToken string `json:"slack_token"`
SlackChannel string `json:"slack_channel"`
InternalApisEndpoint string `json:"internal_apis_endpoint"`
InternalApisAuthToken string `json:"internal_apis_auth_token"`
LbrycrdString string `json:"lbrycrd_string"`
WalletS3Config S3Configs `json:"wallet_s3_config"`
BlockchaindbS3Config S3Configs `json:"blockchaindb_s3_config"`
AWSThumbnailsS3Config S3Configs `json:"aws_thumbnails_s3_config"`
ThumbnailsS3Config S3Configs `json:"thumbnails_s3_config"`
}
var Configuration *Configs
func Init(configPath string) error {
if Configuration != nil {
return nil
}
c := Configs{}
err := gonfig.GetConf(configPath, &c)
if err != nil {
return errors.Err(err)
}
Configuration = &c
return nil
}
func (s *S3Configs) GetS3AWSConfig() *aws.Config {
return &aws.Config{
Credentials: credentials.NewStaticCredentials(s.ID, s.Secret, ""),
Region: &s.Region,
Endpoint: &s.Endpoint,
S3ForcePathStyle: aws.Bool(true),
}
}

View file

@ -36,6 +36,9 @@ func GetPlaylistVideoIDs(channelName string, maxVideos int, stopChan stop.Chan,
} }
videoIDs := make([]string, 0, maxVideos) videoIDs := make([]string, 0, maxVideos)
for i, v := range ids { for i, v := range ids {
if v == "" {
continue
}
logrus.Debugf("%d - video id %s", i, v) logrus.Debugf("%d - video id %s", i, v)
if i >= maxVideos { if i >= maxVideos {
break break

View file

@ -29,12 +29,7 @@ func TestGetVideoInformation(t *testing.T) {
} }
func Test_getUploadTime(t *testing.T) { func Test_getUploadTime(t *testing.T) {
configs := sdk.APIConfig{ configs := sdk.APIConfig{}
YoutubeAPIKey: "",
ApiURL: "https://api.lbry.com",
ApiToken: "Ht4NETrL5oWKyAaZkuSV68BKhtXkiLh5",
HostName: "test",
}
got, err := getUploadTime(&configs, "kDGOHNpRjzc", nil, "20060102") got, err := getUploadTime(&configs, "kDGOHNpRjzc", nil, "20060102")
assert.NoError(t, err) assert.NoError(t, err)
t.Log(got) t.Log(got)

View file

@ -14,11 +14,8 @@ export LOCAL_TMP_DIR="/var/tmp:/var/tmp"
touch -a .env && set -o allexport; source ./.env; set +o allexport touch -a .env && set -o allexport; source ./.env; set +o allexport
echo "LOCAL_TMP_DIR=$LOCAL_TMP_DIR" echo "LOCAL_TMP_DIR=$LOCAL_TMP_DIR"
# Compose settings - docker only # Compose settings - docker only
export SLACK_CHANNEL="ytsync-travis"
export LBRY_API_TOKEN="ytsyntoken"
export LBRY_WEB_API="http://localhost:15400"
export LBRYNET_ADDRESS="http://localhost:15100" export LBRYNET_ADDRESS="http://localhost:15100"
export LBRYCRD_STRING="tcp://lbry:lbry@localhost:15200" export LBRYCRD_STRING="tcp://lbry:lbry@localhost:15200" #required for supporty
export LBRYNET_USE_DOCKER=true export LBRYNET_USE_DOCKER=true
export REFLECT_BLOBS=false export REFLECT_BLOBS=false
export CLEAN_ON_STARTUP=true export CLEAN_ON_STARTUP=true
@ -50,9 +47,9 @@ until curl --output /dev/null --silent --head --fail http://localhost:15400; do
done done
echo "successfully started..." echo "successfully started..."
channelToSync="UCGyoEsIRjmnmzrsB67DhrOA" channelToSync="UCMn-zv1SE-2y6vyewscfFqw"
channelName=@Alaminemoh11"$(date +%s)" channelName=@whatever"$(date +%s)"
latestVideoID="ejWF7Jjdgmc" latestVideoID="yPJgjiMbmX0"
#Data Setup for test #Data Setup for test
./data_setup.sh "$channelName" "$channelToSync" "$latestVideoID" ./data_setup.sh "$channelName" "$channelToSync" "$latestVideoID"

1
go.mod
View file

@ -32,6 +32,7 @@ require (
github.com/spf13/cobra v0.0.5 github.com/spf13/cobra v0.0.5
github.com/spf13/pflag v1.0.5 // indirect github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/testify v1.7.0 github.com/stretchr/testify v1.7.0
github.com/tkanos/gonfig v0.0.0-20210106201359-53e13348de2f
github.com/vbauerster/mpb/v7 v7.0.2 github.com/vbauerster/mpb/v7 v7.0.2
google.golang.org/appengine v1.6.5 // indirect google.golang.org/appengine v1.6.5 // indirect
gopkg.in/ini.v1 v1.60.2 // indirect gopkg.in/ini.v1 v1.60.2 // indirect

3
go.sum
View file

@ -111,6 +111,7 @@ github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiD
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0= github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0=
github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q= github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q=
@ -488,6 +489,8 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA= github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA=
github.com/tkanos/gonfig v0.0.0-20210106201359-53e13348de2f h1:xDFq4NVQD34ekH5UsedBSgfxsBuPU2aZf7v4t0tH2jY=
github.com/tkanos/gonfig v0.0.0-20210106201359-53e13348de2f/go.mod h1:DaZPBuToMc2eezA9R9nDAnmS2RMwL7yEa5YD36ESQdI=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/uber-go/atomic v1.3.2/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g= github.com/uber-go/atomic v1.3.2/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g=

79
main.go
View file

@ -7,14 +7,16 @@ import (
"os" "os"
"time" "time"
"github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/ytsync/v5/configs"
"github.com/lbryio/lbry.go/v2/extras/util"
"github.com/lbryio/ytsync/v5/manager" "github.com/lbryio/ytsync/v5/manager"
"github.com/lbryio/ytsync/v5/sdk" "github.com/lbryio/ytsync/v5/sdk"
"github.com/lbryio/ytsync/v5/shared" "github.com/lbryio/ytsync/v5/shared"
ytUtils "github.com/lbryio/ytsync/v5/util" ytUtils "github.com/lbryio/ytsync/v5/util"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/util"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -69,10 +71,14 @@ func main() {
} }
func ytSync(cmd *cobra.Command, args []string) { func ytSync(cmd *cobra.Command, args []string) {
err := configs.Init("./config.json")
if err != nil {
log.Fatalf("could not parse configuration file: %s", errors.FullTrace(err))
}
var hostname string var hostname string
slackToken := os.Getenv("SLACK_TOKEN")
if slackToken == "" { if configs.Configuration.SlackToken == "" {
log.Error("A slack token was not present in env vars! Slack messages disabled!") log.Error("A slack token was not present in the config! Slack messages disabled!")
} else { } else {
var err error var err error
hostname, err = os.Hostname() hostname, err = os.Hostname()
@ -84,7 +90,7 @@ func ytSync(cmd *cobra.Command, args []string) {
hostname = hostname[0:30] hostname = hostname[0:30]
} }
util.InitSlack(os.Getenv("SLACK_TOKEN"), os.Getenv("SLACK_CHANNEL"), hostname) util.InitSlack(configs.Configuration.SlackToken, configs.Configuration.SlackChannel, hostname)
} }
if cliFlags.Status != "" && !util.InSlice(cliFlags.Status, shared.SyncStatuses) { if cliFlags.Status != "" && !util.InSlice(cliFlags.Status, shared.SyncStatuses) {
@ -103,68 +109,41 @@ func ytSync(cmd *cobra.Command, args []string) {
} }
cliFlags.MaxVideoLength = time.Duration(maxVideoLength) * time.Hour cliFlags.MaxVideoLength = time.Duration(maxVideoLength) * time.Hour
apiURL := os.Getenv("LBRY_WEB_API") if configs.Configuration.InternalApisEndpoint == "" {
apiToken := os.Getenv("LBRY_API_TOKEN") log.Errorln("An Internal APIs Endpoint was not defined")
youtubeAPIKey := os.Getenv("YOUTUBE_API_KEY")
lbrycrdDsn := os.Getenv("LBRYCRD_STRING")
awsS3ID := os.Getenv("AWS_S3_ID")
awsS3Secret := os.Getenv("AWS_S3_SECRET")
awsS3Region := os.Getenv("AWS_S3_REGION")
awsS3Bucket := os.Getenv("AWS_S3_BUCKET")
if apiURL == "" {
log.Errorln("An API URL was not defined. Please set the environment variable LBRY_WEB_API")
return return
} }
if apiToken == "" { if configs.Configuration.InternalApisAuthToken == "" {
log.Errorln("An API Token was not defined. Please set the environment variable LBRY_API_TOKEN") log.Errorln("An Internal APIs auth token was not defined")
return return
} }
if youtubeAPIKey == "" { if configs.Configuration.WalletS3Config.ID == "" || configs.Configuration.WalletS3Config.Region == "" || configs.Configuration.WalletS3Config.Bucket == "" || configs.Configuration.WalletS3Config.Secret == "" || configs.Configuration.WalletS3Config.Endpoint == "" {
log.Errorln("A Youtube API key was not defined. Please set the environment variable YOUTUBE_API_KEY") log.Errorln("Wallet S3 configuration is incomplete")
return return
} }
if awsS3ID == "" { if configs.Configuration.BlockchaindbS3Config.ID == "" || configs.Configuration.BlockchaindbS3Config.Region == "" || configs.Configuration.BlockchaindbS3Config.Bucket == "" || configs.Configuration.BlockchaindbS3Config.Secret == "" || configs.Configuration.BlockchaindbS3Config.Endpoint == "" {
log.Errorln("AWS S3 ID credentials were not defined. Please set the environment variable AWS_S3_ID") log.Errorln("Blockchain DBs S3 configuration is incomplete")
return return
} }
if awsS3Secret == "" { if configs.Configuration.LbrycrdString == "" {
log.Errorln("AWS S3 Secret credentials were not defined. Please set the environment variable AWS_S3_SECRET") log.Infoln("Using default (local) lbrycrd instance. Set lbrycrd_string if you want to use something else")
return
}
if awsS3Region == "" {
log.Errorln("AWS S3 Region was not defined. Please set the environment variable AWS_S3_REGION")
return
}
if awsS3Bucket == "" {
log.Errorln("AWS S3 Bucket was not defined. Please set the environment variable AWS_S3_BUCKET")
return
}
if lbrycrdDsn == "" {
log.Infoln("Using default (local) lbrycrd instance. Set LBRYCRD_STRING if you want to use something else")
} }
blobsDir := ytUtils.GetBlobsDir() blobsDir := ytUtils.GetBlobsDir()
apiConfig := &sdk.APIConfig{ apiConfig := &sdk.APIConfig{
YoutubeAPIKey: youtubeAPIKey, ApiURL: configs.Configuration.InternalApisEndpoint,
ApiURL: apiURL, ApiToken: configs.Configuration.InternalApisAuthToken,
ApiToken: apiToken, HostName: hostname,
HostName: hostname,
}
awsConfig := &shared.AwsConfigs{
AwsS3ID: awsS3ID,
AwsS3Secret: awsS3Secret,
AwsS3Region: awsS3Region,
AwsS3Bucket: awsS3Bucket,
} }
sm := manager.NewSyncManager( sm := manager.NewSyncManager(
cliFlags, cliFlags,
blobsDir, blobsDir,
lbrycrdDsn, configs.Configuration.LbrycrdString,
awsConfig,
apiConfig, apiConfig,
) )
err := sm.Start() err = sm.Start()
if err != nil { if err != nil {
ytUtils.SendErrorToSlack(errors.FullTrace(err)) ytUtils.SendErrorToSlack(errors.FullTrace(err))
} }

View file

@ -24,18 +24,16 @@ type SyncManager struct {
CliFlags shared.SyncFlags CliFlags shared.SyncFlags
ApiConfig *sdk.APIConfig ApiConfig *sdk.APIConfig
LbrycrdDsn string LbrycrdDsn string
AwsConfigs *shared.AwsConfigs
blobsDir string blobsDir string
channelsToSync []Sync channelsToSync []Sync
} }
func NewSyncManager(cliFlags shared.SyncFlags, blobsDir, lbrycrdDsn string, awsConfigs *shared.AwsConfigs, apiConfig *sdk.APIConfig) *SyncManager { func NewSyncManager(cliFlags shared.SyncFlags, blobsDir, lbrycrdDsn string, apiConfig *sdk.APIConfig) *SyncManager {
return &SyncManager{ return &SyncManager{
CliFlags: cliFlags, CliFlags: cliFlags,
blobsDir: blobsDir, blobsDir: blobsDir,
LbrycrdDsn: lbrycrdDsn, LbrycrdDsn: lbrycrdDsn,
AwsConfigs: awsConfigs,
ApiConfig: apiConfig, ApiConfig: apiConfig,
} }
} }

View file

@ -3,6 +3,12 @@ package manager
import ( import (
"os" "os"
"path/filepath" "path/filepath"
"strings"
"github.com/lbryio/ytsync/v5/configs"
"github.com/lbryio/ytsync/v5/util"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/awserr"
@ -10,24 +16,21 @@ import (
"github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/aws/aws-sdk-go/service/s3/s3manager"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/lbryio/lbry.go/v2/extras/errors"
logUtils "github.com/lbryio/ytsync/v5/util"
) )
func (s *Sync) getS3Downloader() (*s3manager.Downloader, error) { func (s *Sync) getS3Downloader(config *aws.Config) (*s3manager.Downloader, error) {
s3Session, err := session.NewSession(s.Manager.AwsConfigs.GetS3AWSConfig()) s3Session, err := session.NewSession(config)
if err != nil { if err != nil {
return nil, errors.Prefix("error starting session: ", err) return nil, errors.Prefix("error starting session", err)
} }
downloader := s3manager.NewDownloader(s3Session) downloader := s3manager.NewDownloader(s3Session)
return downloader, nil return downloader, nil
} }
func (s *Sync) getS3Uploader() (*s3manager.Uploader, error) {
s3Session, err := session.NewSession(s.Manager.AwsConfigs.GetS3AWSConfig()) func (s *Sync) getS3Uploader(config *aws.Config) (*s3manager.Uploader, error) {
s3Session, err := session.NewSession(config)
if err != nil { if err != nil {
return nil, errors.Prefix("error starting session: ", err) return nil, errors.Prefix("error starting session", err)
} }
uploader := s3manager.NewUploader(s3Session) uploader := s3manager.NewUploader(s3Session)
return uploader, nil return uploader, nil
@ -38,18 +41,18 @@ func (s *Sync) downloadWallet() error {
if err != nil { if err != nil {
return errors.Err(err) return errors.Err(err)
} }
downloader, err := s.getS3Downloader() downloader, err := s.getS3Downloader(configs.Configuration.WalletS3Config.GetS3AWSConfig())
if err != nil { if err != nil {
return err return err
} }
out, err := os.Create(defaultTempWalletDir) out, err := os.Create(defaultTempWalletDir)
if err != nil { if err != nil {
return errors.Prefix("error creating temp wallet: ", err) return errors.Prefix("error creating temp wallet", err)
} }
defer out.Close() defer out.Close()
bytesWritten, err := downloader.Download(out, &s3.GetObjectInput{ bytesWritten, err := downloader.Download(out, &s3.GetObjectInput{
Bucket: aws.String(s.Manager.AwsConfigs.AwsS3Bucket), Bucket: aws.String(configs.Configuration.WalletS3Config.Bucket),
Key: key, Key: key,
}) })
if err != nil { if err != nil {
@ -74,21 +77,21 @@ func (s *Sync) downloadWallet() error {
err = os.Rename(defaultTempWalletDir, defaultWalletDir) err = os.Rename(defaultTempWalletDir, defaultWalletDir)
if err != nil { if err != nil {
return errors.Prefix("error replacing temp wallet for default wallet: ", err) return errors.Prefix("error replacing temp wallet for default wallet", err)
} }
return nil return nil
} }
func (s *Sync) downloadBlockchainDB() error { func (s *Sync) downloadBlockchainDB() error {
if logUtils.IsRegTest() { //if util.IsRegTest() {
return nil // tests fail if we re-use the same blockchain DB // return nil // tests fail if we re-use the same blockchain DB
} //}
defaultBDBDir, defaultTempBDBDir, key, err := s.getBlockchainDBPaths() defaultBDBPath, defaultTempBDBPath, key, err := s.getBlockchainDBPaths()
if err != nil { if err != nil {
return errors.Err(err) return errors.Err(err)
} }
files, err := filepath.Glob(defaultBDBDir + "*") files, err := filepath.Glob(defaultBDBPath + "*")
if err != nil { if err != nil {
return errors.Err(err) return errors.Err(err)
} }
@ -101,18 +104,18 @@ func (s *Sync) downloadBlockchainDB() error {
if s.DbChannelData.WipeDB { if s.DbChannelData.WipeDB {
return nil return nil
} }
downloader, err := s.getS3Downloader() downloader, err := s.getS3Downloader(configs.Configuration.BlockchaindbS3Config.GetS3AWSConfig())
if err != nil { if err != nil {
return errors.Err(err) return errors.Err(err)
} }
out, err := os.Create(defaultTempBDBDir) out, err := os.Create(defaultTempBDBPath)
if err != nil { if err != nil {
return errors.Prefix("error creating temp wallet: ", err) return errors.Prefix("error creating temp blockchain DB file", err)
} }
defer out.Close() defer out.Close()
bytesWritten, err := downloader.Download(out, &s3.GetObjectInput{ bytesWritten, err := downloader.Download(out, &s3.GetObjectInput{
Bucket: aws.String(s.Manager.AwsConfigs.AwsS3Bucket), Bucket: aws.String(configs.Configuration.BlockchaindbS3Config.Bucket),
Key: key, Key: key,
}) })
if err != nil { if err != nil {
@ -135,11 +138,13 @@ func (s *Sync) downloadBlockchainDB() error {
return errors.Err("zero bytes written") return errors.Err("zero bytes written")
} }
err = os.Rename(defaultTempBDBDir, defaultBDBDir) blockchainDbDir := strings.Replace(defaultBDBPath, "blockchain.db", "", -1)
err = util.Untar(defaultTempBDBPath, blockchainDbDir)
if err != nil { if err != nil {
return errors.Prefix("error replacing temp blockchain.db for default blockchain.db: ", err) return errors.Prefix("error extracting blockchain.db files", err)
} }
log.Printf("blockchain.db downloaded to %s", defaultBDBDir)
log.Printf("blockchain.db data downloaded and extracted to %s", blockchainDbDir)
return nil return nil
} }
@ -147,7 +152,7 @@ func (s *Sync) getWalletPaths() (defaultWallet, tempWallet string, key *string,
defaultWallet = os.Getenv("HOME") + "/.lbryum/wallets/default_wallet" defaultWallet = os.Getenv("HOME") + "/.lbryum/wallets/default_wallet"
tempWallet = os.Getenv("HOME") + "/.lbryum/wallets/tmp_wallet" tempWallet = os.Getenv("HOME") + "/.lbryum/wallets/tmp_wallet"
key = aws.String("/wallets/" + s.DbChannelData.ChannelId) key = aws.String("/wallets/" + s.DbChannelData.ChannelId)
if logUtils.IsRegTest() { if util.IsRegTest() {
defaultWallet = os.Getenv("HOME") + "/.lbryum_regtest/wallets/default_wallet" defaultWallet = os.Getenv("HOME") + "/.lbryum_regtest/wallets/default_wallet"
tempWallet = os.Getenv("HOME") + "/.lbryum_regtest/wallets/tmp_wallet" tempWallet = os.Getenv("HOME") + "/.lbryum_regtest/wallets/tmp_wallet"
key = aws.String("/regtest/" + s.DbChannelData.ChannelId) key = aws.String("/regtest/" + s.DbChannelData.ChannelId)
@ -168,27 +173,27 @@ func (s *Sync) getWalletPaths() (defaultWallet, tempWallet string, key *string,
func (s *Sync) getBlockchainDBPaths() (defaultDB, tempDB string, key *string, err error) { func (s *Sync) getBlockchainDBPaths() (defaultDB, tempDB string, key *string, err error) {
lbryumDir := os.Getenv("LBRYUM_DIR") lbryumDir := os.Getenv("LBRYUM_DIR")
if lbryumDir == "" { if lbryumDir == "" {
if logUtils.IsRegTest() { if util.IsRegTest() {
lbryumDir = os.Getenv("HOME") + "/.lbryum_regtest" lbryumDir = os.Getenv("HOME") + "/.lbryum_regtest"
} else { } else {
lbryumDir = os.Getenv("HOME") + "/.lbryum" lbryumDir = os.Getenv("HOME") + "/.lbryum"
} }
} }
defaultDB = lbryumDir + "/lbc_mainnet/blockchain.db" defaultDB = lbryumDir + "/lbc_mainnet/blockchain.db"
tempDB = lbryumDir + "/lbc_mainnet/tmp_blockchain.db" tempDB = lbryumDir + "/lbc_mainnet/tmp_blockchain.tar"
key = aws.String("/blockchain_dbs/" + s.DbChannelData.ChannelId) key = aws.String("/blockchain_dbs/" + s.DbChannelData.ChannelId + ".tar")
if logUtils.IsRegTest() { if util.IsRegTest() {
defaultDB = lbryumDir + "/lbc_regtest/blockchain.db" defaultDB = lbryumDir + "/lbc_regtest/blockchain.db"
tempDB = lbryumDir + "/lbc_regtest/tmp_blockchain.db" tempDB = lbryumDir + "/lbc_regtest/tmp_blockchain.tar"
key = aws.String("/regtest_dbs/" + s.DbChannelData.ChannelId) key = aws.String("/regtest_dbs/" + s.DbChannelData.ChannelId + ".tar")
} }
return return
} }
func (s *Sync) uploadWallet() error { func (s *Sync) uploadWallet() error {
defaultWalletDir := logUtils.GetDefaultWalletPath() defaultWalletDir := util.GetDefaultWalletPath()
key := aws.String("/wallets/" + s.DbChannelData.ChannelId) key := aws.String("/wallets/" + s.DbChannelData.ChannelId)
if logUtils.IsRegTest() { if util.IsRegTest() {
key = aws.String("/regtest/" + s.DbChannelData.ChannelId) key = aws.String("/regtest/" + s.DbChannelData.ChannelId)
} }
@ -196,7 +201,7 @@ func (s *Sync) uploadWallet() error {
return errors.Err("default_wallet does not exist") return errors.Err("default_wallet does not exist")
} }
uploader, err := s.getS3Uploader() uploader, err := s.getS3Uploader(configs.Configuration.WalletS3Config.GetS3AWSConfig())
if err != nil { if err != nil {
return err return err
} }
@ -208,7 +213,7 @@ func (s *Sync) uploadWallet() error {
defer file.Close() defer file.Close()
_, err = uploader.Upload(&s3manager.UploadInput{ _, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(s.Manager.AwsConfigs.AwsS3Bucket), Bucket: aws.String(configs.Configuration.WalletS3Config.Bucket),
Key: key, Key: key,
Body: file, Body: file,
}) })
@ -229,26 +234,35 @@ func (s *Sync) uploadBlockchainDB() error {
if _, err := os.Stat(defaultBDBDir); os.IsNotExist(err) { if _, err := os.Stat(defaultBDBDir); os.IsNotExist(err) {
return errors.Err("blockchain.db does not exist") return errors.Err("blockchain.db does not exist")
} }
files, err := filepath.Glob(defaultBDBDir + "*")
uploader, err := s.getS3Uploader() if err != nil {
return errors.Err(err)
}
tarPath := strings.Replace(defaultBDBDir, "blockchain.db", "", -1) + s.DbChannelData.ChannelId + ".tar"
err = util.CreateTarball(tarPath, files)
if err != nil { if err != nil {
return err return err
} }
file, err := os.Open(defaultBDBDir) uploader, err := s.getS3Uploader(configs.Configuration.BlockchaindbS3Config.GetS3AWSConfig())
if err != nil {
return err
}
file, err := os.Open(tarPath)
if err != nil { if err != nil {
return err return err
} }
defer file.Close() defer file.Close()
_, err = uploader.Upload(&s3manager.UploadInput{ _, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(s.Manager.AwsConfigs.AwsS3Bucket), Bucket: aws.String(configs.Configuration.BlockchaindbS3Config.Bucket),
Key: key, Key: key,
Body: file, Body: file,
}) })
if err != nil { if err != nil {
return err return err
} }
log.Println("blockchain.db uploaded to S3") log.Println("blockchain.db files uploaded to S3")
return os.Remove(defaultBDBDir) return os.Remove(defaultBDBDir)
} }

View file

@ -311,12 +311,12 @@ func (s *Sync) waitForNewBlock() error {
func (s *Sync) GenerateRegtestBlock() error { func (s *Sync) GenerateRegtestBlock() error {
lbrycrd, err := logUtils.GetLbrycrdClient(s.Manager.LbrycrdDsn) lbrycrd, err := logUtils.GetLbrycrdClient(s.Manager.LbrycrdDsn)
if err != nil { if err != nil {
return errors.Prefix("error getting lbrycrd client: ", err) return errors.Prefix("error getting lbrycrd client", err)
} }
txs, err := lbrycrd.Generate(1) txs, err := lbrycrd.Generate(1)
if err != nil { if err != nil {
return errors.Prefix("error generating new block: ", err) return errors.Prefix("error generating new block", err)
} }
for _, tx := range txs { for _, tx := range txs {
@ -407,7 +407,7 @@ func (s *Sync) ensureChannelOwnership() error {
} }
thumbnail := channelInfo.Header.C4TabbedHeaderRenderer.Avatar.Thumbnails[len(channelInfo.Header.C4TabbedHeaderRenderer.Avatar.Thumbnails)-1].URL thumbnail := channelInfo.Header.C4TabbedHeaderRenderer.Avatar.Thumbnails[len(channelInfo.Header.C4TabbedHeaderRenderer.Avatar.Thumbnails)-1].URL
thumbnailURL, err := thumbs.MirrorThumbnail(thumbnail, s.DbChannelData.ChannelId, *s.Manager.AwsConfigs.GetS3AWSConfig()) thumbnailURL, err := thumbs.MirrorThumbnail(thumbnail, s.DbChannelData.ChannelId)
if err != nil { if err != nil {
return err return err
} }
@ -416,7 +416,6 @@ func (s *Sync) ensureChannelOwnership() error {
if channelInfo.Header.C4TabbedHeaderRenderer.Banner.Thumbnails != nil { if channelInfo.Header.C4TabbedHeaderRenderer.Banner.Thumbnails != nil {
bURL, err := thumbs.MirrorThumbnail(channelInfo.Header.C4TabbedHeaderRenderer.Banner.Thumbnails[len(channelInfo.Header.C4TabbedHeaderRenderer.Banner.Thumbnails)-1].URL, bURL, err := thumbs.MirrorThumbnail(channelInfo.Header.C4TabbedHeaderRenderer.Banner.Thumbnails[len(channelInfo.Header.C4TabbedHeaderRenderer.Banner.Thumbnails)-1].URL,
"banner-"+s.DbChannelData.ChannelId, "banner-"+s.DbChannelData.ChannelId,
*s.Manager.AwsConfigs.GetS3AWSConfig(),
) )
if err != nil { if err != nil {
return err return err

View file

@ -358,9 +358,9 @@ func (s *Sync) stopAndUploadWallet(e *error) {
err = s.uploadBlockchainDB() err = s.uploadBlockchainDB()
if err != nil { if err != nil {
if *e == nil { if *e == nil {
e = &err *e = err
} else { } else {
*e = errors.Prefix("failure uploading wallet", *e) *e = errors.Prefix(fmt.Sprintf("failure uploading blockchain DB: %s + original error", errors.FullTrace(err)), *e)
} }
} }
} }
@ -863,7 +863,6 @@ func (s *Sync) enqueueYoutubeVideos() error {
videos, err := ytapi.GetVideosToSync(s.Manager.ApiConfig, s.DbChannelData.ChannelId, s.syncedVideos, s.Manager.CliFlags.QuickSync, s.Manager.CliFlags.VideosToSync(s.DbChannelData.TotalSubscribers), ytapi.VideoParams{ videos, err := ytapi.GetVideosToSync(s.Manager.ApiConfig, s.DbChannelData.ChannelId, s.syncedVideos, s.Manager.CliFlags.QuickSync, s.Manager.CliFlags.VideosToSync(s.DbChannelData.TotalSubscribers), ytapi.VideoParams{
VideoDir: s.videoDirectory, VideoDir: s.videoDirectory,
S3Config: *s.Manager.AwsConfigs.GetS3AWSConfig(),
Stopper: s.grp, Stopper: s.grp,
IPPool: ipPool, IPPool: ipPool,
}, s.DbChannelData.LastUploadedVideo) }, s.DbChannelData.LastUploadedVideo)

View file

@ -25,10 +25,9 @@ const (
) )
type APIConfig struct { type APIConfig struct {
YoutubeAPIKey string ApiURL string
ApiURL string ApiToken string
ApiToken string HostName string
HostName string
} }
func (a *APIConfig) FetchChannels(status string, cliFlags *shared.SyncFlags) ([]shared.YoutubeChannel, error) { func (a *APIConfig) FetchChannels(status string, cliFlags *shared.SyncFlags) ([]shared.YoutubeChannel, error) {

View file

@ -4,8 +4,6 @@ import (
"encoding/json" "encoding/json"
"time" "time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/errors"
) )
@ -213,17 +211,3 @@ const (
TransferStateComplete TransferStateComplete
TransferStateManual TransferStateManual
) )
type AwsConfigs struct {
AwsS3ID string
AwsS3Secret string
AwsS3Region string
AwsS3Bucket string
}
func (a *AwsConfigs) GetS3AWSConfig() *aws.Config {
return &aws.Config{
Credentials: credentials.NewStaticCredentials(a.AwsS3ID, a.AwsS3Secret, ""),
Region: &a.AwsS3Region,
}
}

View file

@ -37,7 +37,6 @@ import (
"github.com/lbryio/lbry.go/v2/extras/stop" "github.com/lbryio/lbry.go/v2/extras/stop"
"github.com/lbryio/lbry.go/v2/extras/util" "github.com/lbryio/lbry.go/v2/extras/util"
"github.com/aws/aws-sdk-go/aws"
"github.com/shopspring/decimal" "github.com/shopspring/decimal"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -55,7 +54,6 @@ type YoutubeVideo struct {
youtubeInfo *ytdl.YtdlVideo youtubeInfo *ytdl.YtdlVideo
youtubeChannelID string youtubeChannelID string
tags []string tags []string
awsConfig aws.Config
thumbnailURL string thumbnailURL string
lbryChannelID string lbryChannelID string
mocked bool mocked bool
@ -101,7 +99,7 @@ var youtubeCategories = map[string]string{
"44": "trailers", "44": "trailers",
} }
func NewYoutubeVideo(directory string, videoData *ytdl.YtdlVideo, playlistPosition int64, awsConfig aws.Config, stopGroup *stop.Group, pool *ip_manager.IPPool) (*YoutubeVideo, error) { func NewYoutubeVideo(directory string, videoData *ytdl.YtdlVideo, playlistPosition int64, stopGroup *stop.Group, pool *ip_manager.IPPool) (*YoutubeVideo, error) {
// youtube-dl returns times in local timezone sometimes. this could break in the future // youtube-dl returns times in local timezone sometimes. this could break in the future
// maybe we can file a PR to choose the timezone we want from youtube-dl // maybe we can file a PR to choose the timezone we want from youtube-dl
return &YoutubeVideo{ return &YoutubeVideo{
@ -112,19 +110,18 @@ func NewYoutubeVideo(directory string, videoData *ytdl.YtdlVideo, playlistPositi
publishedAt: videoData.UploadDateForReal, publishedAt: videoData.UploadDateForReal,
dir: directory, dir: directory,
youtubeInfo: videoData, youtubeInfo: videoData,
awsConfig: awsConfig,
mocked: false, mocked: false,
youtubeChannelID: videoData.ChannelID, youtubeChannelID: videoData.ChannelID,
stopGroup: stopGroup, stopGroup: stopGroup,
pool: pool, pool: pool,
}, nil }, nil
} }
func NewMockedVideo(directory string, videoID string, youtubeChannelID string, awsConfig aws.Config, stopGroup *stop.Group, pool *ip_manager.IPPool) *YoutubeVideo {
func NewMockedVideo(directory string, videoID string, youtubeChannelID string, stopGroup *stop.Group, pool *ip_manager.IPPool) *YoutubeVideo {
return &YoutubeVideo{ return &YoutubeVideo{
id: videoID, id: videoID,
playlistPosition: 0, playlistPosition: 0,
dir: directory, dir: directory,
awsConfig: awsConfig,
mocked: true, mocked: true,
youtubeChannelID: youtubeChannelID, youtubeChannelID: youtubeChannelID,
stopGroup: stopGroup, stopGroup: stopGroup,
@ -688,7 +685,7 @@ func (v *YoutubeVideo) triggerThumbnailSave() (err error) {
if thumbnail.Width == 0 { if thumbnail.Width == 0 {
return errors.Err("default youtube thumbnail found") return errors.Err("default youtube thumbnail found")
} }
v.thumbnailURL, err = thumbs.MirrorThumbnail(thumbnail.URL, v.ID(), v.awsConfig) v.thumbnailURL, err = thumbs.MirrorThumbnail(thumbnail.URL, v.ID())
return err return err
} }
@ -880,7 +877,7 @@ func (v *YoutubeVideo) reprocess(daemon *jsonrpc.Client, params SyncParams, exis
return nil, errors.Err("could not find thumbnail for mocked video") return nil, errors.Err("could not find thumbnail for mocked video")
} }
thumbnail := thumbs.GetBestThumbnail(v.youtubeInfo.Thumbnails) thumbnail := thumbs.GetBestThumbnail(v.youtubeInfo.Thumbnails)
thumbnailURL, err = thumbs.MirrorThumbnail(thumbnail.URL, v.ID(), v.awsConfig) thumbnailURL, err = thumbs.MirrorThumbnail(thumbnail.URL, v.ID())
} else { } else {
thumbnailURL = thumbs.ThumbnailEndpoint + v.ID() thumbnailURL = thumbs.ThumbnailEndpoint + v.ID()
} }

View file

@ -6,6 +6,7 @@ import (
"os" "os"
"strings" "strings"
"github.com/lbryio/ytsync/v5/configs"
"github.com/lbryio/ytsync/v5/downloader/ytdl" "github.com/lbryio/ytsync/v5/downloader/ytdl"
"github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/errors"
@ -83,11 +84,11 @@ func (u *thumbnailUploader) deleteTmpFile() {
log.Infof("failed to delete local thumbnail file: %s", err.Error()) log.Infof("failed to delete local thumbnail file: %s", err.Error())
} }
} }
func MirrorThumbnail(url string, name string, s3Config aws.Config) (string, error) { func MirrorThumbnail(url string, name string) (string, error) {
tu := thumbnailUploader{ tu := thumbnailUploader{
originalUrl: url, originalUrl: url,
name: name, name: name,
s3Config: s3Config, s3Config: *configs.Configuration.AWSThumbnailsS3Config.GetS3AWSConfig(),
} }
err := tu.downloadThumbnail() err := tu.downloadThumbnail()
if err != nil { if err != nil {
@ -100,14 +101,12 @@ func MirrorThumbnail(url string, name string, s3Config aws.Config) (string, erro
return "", err return "", err
} }
ownS3Config := s3Config.Copy(&aws.Config{Endpoint: aws.String("s3.lbry.tech")}) //this is our own S3 storage
tu2 := thumbnailUploader{ tu2 := thumbnailUploader{
originalUrl: url, originalUrl: url,
name: name, name: name,
s3Config: *ownS3Config, s3Config: *configs.Configuration.ThumbnailsS3Config.GetS3AWSConfig(),
} }
//own S3
err = tu2.uploadThumbnail() err = tu2.uploadThumbnail()
if err != nil { if err != nil {
return "", err return "", err

109
util/archive.go Normal file
View file

@ -0,0 +1,109 @@
package util
import (
"archive/tar"
"io"
"io/fs"
"os"
"path/filepath"
"github.com/lbryio/lbry.go/v2/extras/errors"
)
func CreateTarball(tarballFilePath string, filePaths []string) error {
file, err := os.Create(tarballFilePath)
if err != nil {
return errors.Err("Could not create tarball file '%s', got error '%s'", tarballFilePath, err.Error())
}
defer file.Close()
tarWriter := tar.NewWriter(file)
defer tarWriter.Close()
for _, filePath := range filePaths {
err := addFileToTarWriter(filePath, tarWriter)
if err != nil {
return errors.Err("Could not add file '%s', to tarball, got error '%s'", filePath, err.Error())
}
}
return nil
}
func addFileToTarWriter(filePath string, tarWriter *tar.Writer) error {
file, err := os.Open(filePath)
if err != nil {
return errors.Err("Could not open file '%s', got error '%s'", filePath, err.Error())
}
defer file.Close()
stat, err := file.Stat()
if err != nil {
return errors.Err("Could not get stat for file '%s', got error '%s'", filePath, err.Error())
}
header := &tar.Header{
Name: stat.Name(),
Size: stat.Size(),
Mode: int64(stat.Mode()),
ModTime: stat.ModTime(),
}
err = tarWriter.WriteHeader(header)
if err != nil {
return errors.Err("Could not write header for file '%s', got error '%s'", filePath, err.Error())
}
_, err = io.Copy(tarWriter, file)
if err != nil {
return errors.Err("Could not copy the file '%s' data to the tarball, got error '%s'", filePath, err.Error())
}
return nil
}
func Untar(tarball, target string) error {
reader, err := os.Open(tarball)
if err != nil {
return errors.Err(err)
}
defer reader.Close()
tarReader := tar.NewReader(reader)
for {
header, err := tarReader.Next()
if err == io.EOF {
break
} else if err != nil {
return errors.Err(err)
}
path := filepath.Join(target, header.Name)
info := header.FileInfo()
if info.IsDir() {
if err = os.MkdirAll(path, info.Mode()); err != nil {
return errors.Err(err)
}
continue
}
err = extractFile(path, info, tarReader)
if err != nil {
return err
}
}
return nil
}
func extractFile(path string, info fs.FileInfo, tarReader *tar.Reader) error {
file, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, info.Mode())
if err != nil {
return errors.Err(err)
}
defer file.Close()
_, err = io.Copy(file, tarReader)
if err != nil {
return errors.Err(err)
}
return nil
}

View file

@ -11,6 +11,7 @@ import (
"github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/lbrycrd" "github.com/lbryio/lbry.go/v2/lbrycrd"
"github.com/lbryio/ytsync/v5/configs"
"github.com/lbryio/ytsync/v5/timing" "github.com/lbryio/ytsync/v5/timing"
"github.com/docker/docker/api/types" "github.com/docker/docker/api/types"
@ -185,9 +186,9 @@ func CleanForStartup() error {
return errors.Err(err) return errors.Err(err)
} }
lbrycrd, err := GetLbrycrdClient(os.Getenv("LBRYCRD_STRING")) lbrycrd, err := GetLbrycrdClient(configs.Configuration.LbrycrdString)
if err != nil { if err != nil {
return errors.Prefix("error getting lbrycrd client: ", err) return errors.Prefix("error getting lbrycrd client", err)
} }
height, err := lbrycrd.GetBlockCount() height, err := lbrycrd.GetBlockCount()
if err != nil { if err != nil {

View file

@ -29,7 +29,6 @@ import (
"github.com/lbryio/lbry.go/v2/extras/stop" "github.com/lbryio/lbry.go/v2/extras/stop"
"github.com/lbryio/lbry.go/v2/extras/util" "github.com/lbryio/lbry.go/v2/extras/util"
"github.com/aws/aws-sdk-go/aws"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -50,7 +49,6 @@ func (a byPublishedAt) Less(i, j int) bool { return a[i].PublishedAt().Before(a[
type VideoParams struct { type VideoParams struct {
VideoDir string VideoDir string
S3Config aws.Config
Stopper *stop.Group Stopper *stop.Group
IPPool *ip_manager.IPPool IPPool *ip_manager.IPPool
} }
@ -103,7 +101,7 @@ func GetVideosToSync(config *sdk.APIConfig, channelID string, syncedVideos map[s
for _, item := range vids { for _, item := range vids {
positionInList := playlistMap[item.ID] positionInList := playlistMap[item.ID]
videoToAdd, err := sources.NewYoutubeVideo(videoParams.VideoDir, item, positionInList, videoParams.S3Config, videoParams.Stopper, videoParams.IPPool) videoToAdd, err := sources.NewYoutubeVideo(videoParams.VideoDir, item, positionInList, videoParams.Stopper, videoParams.IPPool)
if err != nil { if err != nil {
return nil, errors.Err(err) return nil, errors.Err(err)
} }
@ -115,7 +113,7 @@ func GetVideosToSync(config *sdk.APIConfig, channelID string, syncedVideos map[s
continue continue
} }
if _, ok := playlistMap[k]; !ok { if _, ok := playlistMap[k]; !ok {
videos = append(videos, sources.NewMockedVideo(videoParams.VideoDir, k, channelID, videoParams.S3Config, videoParams.Stopper, videoParams.IPPool)) videos = append(videos, sources.NewMockedVideo(videoParams.VideoDir, k, channelID, videoParams.Stopper, videoParams.IPPool))
} }
} }