refactor structures

fix various bugs
This commit is contained in:
Niko Storni 2020-08-08 01:12:55 +02:00
parent 2a33f44317
commit 7c02c5b92d
10 changed files with 346 additions and 460 deletions

112
main.go
View file

@ -11,6 +11,7 @@ import (
"github.com/lbryio/lbry.go/v2/extras/util"
"github.com/lbryio/ytsync/v5/manager"
"github.com/lbryio/ytsync/v5/sdk"
"github.com/lbryio/ytsync/v5/shared"
ytUtils "github.com/lbryio/ytsync/v5/util"
"github.com/prometheus/client_golang/prometheus/promhttp"
@ -23,39 +24,11 @@ var Version string
const defaultMaxTries = 3
var (
flags sdk.SyncFlags
maxTries int
refill int
limit int
syncStatus string
channelID string
syncFrom int64
syncUntil int64
concurrentJobs int
videosLimit int
maxVideoSize int
cliFlags shared.SyncFlags
maxVideoLength int
)
func main() {
//grp := stop.New()
//ipPool, err := ip_manager.GetIPPool(grp)
//if err != nil {
// panic(err)
//}
//
//videoID := "vtIzMaLkCaM"
//
//ip, err := ipPool.GetIP(videoID)
//if err != nil {
// panic(err)
//}
//
//spew.Dump(ip)
//
//spew.Dump(downloader.GetVideoInformation(videoID, &net.TCPAddr{IP: net.ParseIP(ip)}))
//return
rand.Seed(time.Now().UnixNano())
log.SetLevel(log.DebugLevel)
http.Handle("/metrics", promhttp.Handler())
@ -69,24 +42,24 @@ func main() {
Args: cobra.RangeArgs(0, 0),
}
cmd.Flags().BoolVar(&flags.StopOnError, "stop-on-error", false, "If a publish fails, stop all publishing and exit")
cmd.Flags().IntVar(&maxTries, "max-tries", defaultMaxTries, "Number of times to try a publish that fails")
cmd.Flags().BoolVar(&flags.TakeOverExistingChannel, "takeover-existing-channel", false, "If channel exists and we don't own it, take over the channel")
cmd.Flags().IntVar(&limit, "limit", 0, "limit the amount of channels to sync")
cmd.Flags().BoolVar(&flags.SkipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup")
cmd.Flags().BoolVar(&flags.SyncUpdate, "update", false, "Update previously synced channels instead of syncing new ones")
cmd.Flags().BoolVar(&flags.SingleRun, "run-once", false, "Whether the process should be stopped after one cycle or not")
cmd.Flags().BoolVar(&flags.RemoveDBUnpublished, "remove-db-unpublished", false, "Remove videos from the database that are marked as published but aren't really published")
cmd.Flags().BoolVar(&flags.UpgradeMetadata, "upgrade-metadata", false, "Upgrade videos if they're on the old metadata version")
cmd.Flags().BoolVar(&flags.DisableTransfers, "no-transfers", false, "Skips the transferring process of videos, channels and supports")
cmd.Flags().BoolVar(&flags.QuickSync, "quick", false, "Look up only the last 50 videos from youtube")
cmd.Flags().StringVar(&syncStatus, "status", "", "Specify which queue to pull from. Overrides --update")
cmd.Flags().StringVar(&channelID, "channelID", "", "If specified, only this channel will be synced.")
cmd.Flags().Int64Var(&syncFrom, "after", time.Unix(0, 0).Unix(), "Specify from when to pull jobs [Unix time](Default: 0)")
cmd.Flags().Int64Var(&syncUntil, "before", time.Now().AddDate(1, 0, 0).Unix(), "Specify until when to pull jobs [Unix time](Default: current Unix time)")
cmd.Flags().IntVar(&concurrentJobs, "concurrent-jobs", 1, "how many jobs to process concurrently")
cmd.Flags().IntVar(&videosLimit, "videos-limit", 1000, "how many videos to process per channel")
cmd.Flags().IntVar(&maxVideoSize, "max-size", 2048, "Maximum video size to process (in MB)")
cmd.Flags().BoolVar(&cliFlags.StopOnError, "stop-on-error", false, "If a publish fails, stop all publishing and exit")
cmd.Flags().IntVar(&cliFlags.MaxTries, "max-tries", defaultMaxTries, "Number of times to try a publish that fails")
cmd.Flags().BoolVar(&cliFlags.TakeOverExistingChannel, "takeover-existing-channel", false, "If channel exists and we don't own it, take over the channel")
cmd.Flags().IntVar(&cliFlags.Limit, "limit", 0, "limit the amount of channels to sync")
cmd.Flags().BoolVar(&cliFlags.SkipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup")
cmd.Flags().BoolVar(&cliFlags.SyncUpdate, "update", false, "Update previously synced channels instead of syncing new ones")
cmd.Flags().BoolVar(&cliFlags.SingleRun, "run-once", false, "Whether the process should be stopped after one cycle or not")
cmd.Flags().BoolVar(&cliFlags.RemoveDBUnpublished, "remove-db-unpublished", false, "Remove videos from the database that are marked as published but aren't really published")
cmd.Flags().BoolVar(&cliFlags.UpgradeMetadata, "upgrade-metadata", false, "Upgrade videos if they're on the old metadata version")
cmd.Flags().BoolVar(&cliFlags.DisableTransfers, "no-transfers", false, "Skips the transferring process of videos, channels and supports")
cmd.Flags().BoolVar(&cliFlags.QuickSync, "quick", false, "Look up only the last 50 videos from youtube")
cmd.Flags().StringVar(&cliFlags.SyncStatus, "status", "", "Specify which queue to pull from. Overrides --update")
cmd.Flags().StringVar(&cliFlags.ChannelID, "channelID", "", "If specified, only this channel will be synced.")
cmd.Flags().Int64Var(&cliFlags.SyncFrom, "after", time.Unix(0, 0).Unix(), "Specify from when to pull jobs [Unix time](Default: 0)")
cmd.Flags().Int64Var(&cliFlags.SyncUntil, "before", time.Now().AddDate(1, 0, 0).Unix(), "Specify until when to pull jobs [Unix time](Default: current Unix time)")
cmd.Flags().IntVar(&cliFlags.ConcurrentJobs, "concurrent-jobs", 1, "how many jobs to process concurrently")
cmd.Flags().IntVar(&cliFlags.VideosLimit, "videos-limit", 1000, "how many videos to process per channel")
cmd.Flags().IntVar(&cliFlags.MaxVideoSize, "max-size", 2048, "Maximum video size to process (in MB)")
cmd.Flags().IntVar(&maxVideoLength, "max-length", 2, "Maximum video length to process (in hours)")
if err := cmd.Execute(); err != nil {
@ -114,29 +87,30 @@ func ytSync(cmd *cobra.Command, args []string) {
util.InitSlack(os.Getenv("SLACK_TOKEN"), os.Getenv("SLACK_CHANNEL"), hostname)
}
if syncStatus != "" && !util.InSlice(syncStatus, manager.SyncStatuses) {
log.Errorf("status must be one of the following: %v\n", manager.SyncStatuses)
if cliFlags.SyncStatus != "" && !util.InSlice(cliFlags.SyncStatus, shared.SyncStatuses) {
log.Errorf("status must be one of the following: %v\n", shared.SyncStatuses)
return
}
if flags.StopOnError && maxTries != defaultMaxTries {
if cliFlags.StopOnError && cliFlags.MaxTries != defaultMaxTries {
log.Errorln("--stop-on-error and --max-tries are mutually exclusive")
return
}
if maxTries < 1 {
if cliFlags.MaxTries < 1 {
log.Errorln("setting --max-tries less than 1 doesn't make sense")
return
}
if limit < 0 {
if cliFlags.Limit < 0 {
log.Errorln("setting --limit less than 0 (unlimited) doesn't make sense")
return
}
cliFlags.MaxVideoLength = time.Duration(maxVideoLength) * time.Hour
apiURL := os.Getenv("LBRY_WEB_API")
apiToken := os.Getenv("LBRY_API_TOKEN")
youtubeAPIKey := os.Getenv("YOUTUBE_API_KEY")
lbrycrdString := os.Getenv("LBRYCRD_STRING")
lbrycrdDsn := os.Getenv("LBRYCRD_STRING")
awsS3ID := os.Getenv("AWS_S3_ID")
awsS3Secret := os.Getenv("AWS_S3_SECRET")
awsS3Region := os.Getenv("AWS_S3_REGION")
@ -169,42 +143,30 @@ func ytSync(cmd *cobra.Command, args []string) {
log.Errorln("AWS S3 Bucket was not defined. Please set the environment variable AWS_S3_BUCKET")
return
}
if lbrycrdString == "" {
if lbrycrdDsn == "" {
log.Infoln("Using default (local) lbrycrd instance. Set LBRYCRD_STRING if you want to use something else")
}
blobsDir := ytUtils.GetBlobsDir()
syncProperties := &sdk.SyncProperties{
SyncFrom: syncFrom,
SyncUntil: syncUntil,
YoutubeChannelID: channelID,
}
apiConfig := &sdk.APIConfig{
YoutubeAPIKey: youtubeAPIKey,
ApiURL: apiURL,
ApiToken: apiToken,
HostName: hostname,
}
awsConfig := &shared.AwsConfigs{
AwsS3ID: awsS3ID,
AwsS3Secret: awsS3Secret,
AwsS3Region: awsS3Region,
AwsS3Bucket: awsS3Bucket,
}
sm := manager.NewSyncManager(
flags,
maxTries,
refill,
limit,
concurrentJobs,
concurrentJobs,
cliFlags,
blobsDir,
videosLimit,
maxVideoSize,
lbrycrdString,
awsS3ID,
awsS3Secret,
awsS3Region,
awsS3Bucket,
syncStatus,
syncProperties,
lbrycrdDsn,
awsConfig,
apiConfig,
time.Duration(maxVideoLength)*time.Hour,
)
err := sm.Start()
if err != nil {

View file

@ -10,94 +10,43 @@ import (
"github.com/lbryio/ytsync/v5/ip_manager"
"github.com/lbryio/ytsync/v5/namer"
"github.com/lbryio/ytsync/v5/sdk"
"github.com/lbryio/ytsync/v5/shared"
logUtils "github.com/lbryio/ytsync/v5/util"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/util"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
log "github.com/sirupsen/logrus"
)
type SyncManager struct {
SyncFlags sdk.SyncFlags
maxTries int
refill int
limit int
concurrentJobs int
concurrentVideos int
CliFlags shared.SyncFlags
ApiConfig *sdk.APIConfig
LbrycrdDsn string
AwsConfigs *shared.AwsConfigs
blobsDir string
videosLimit int
maxVideoSize int
maxVideoLength time.Duration
lbrycrdString string
awsS3ID string
awsS3Secret string
awsS3Region string
syncStatus string
awsS3Bucket string
syncProperties *sdk.SyncProperties
apiConfig *sdk.APIConfig
channelsToSync []Sync
}
func NewSyncManager(syncFlags sdk.SyncFlags, maxTries int, refill int, limit int, concurrentJobs int, concurrentVideos int, blobsDir string, videosLimit int,
maxVideoSize int, lbrycrdString string, awsS3ID string, awsS3Secret string, awsS3Region string, awsS3Bucket string,
syncStatus string, syncProperties *sdk.SyncProperties, apiConfig *sdk.APIConfig, maxVideoLength time.Duration) *SyncManager {
func NewSyncManager(cliFlags shared.SyncFlags, blobsDir, lbrycrdDsn string, awsConfigs *shared.AwsConfigs, apiConfig *sdk.APIConfig) *SyncManager {
return &SyncManager{
SyncFlags: syncFlags,
maxTries: maxTries,
refill: refill,
limit: limit,
concurrentJobs: concurrentJobs,
concurrentVideos: concurrentVideos,
CliFlags: cliFlags,
blobsDir: blobsDir,
videosLimit: videosLimit,
maxVideoSize: maxVideoSize,
maxVideoLength: maxVideoLength,
lbrycrdString: lbrycrdString,
awsS3ID: awsS3ID,
awsS3Secret: awsS3Secret,
awsS3Region: awsS3Region,
awsS3Bucket: awsS3Bucket,
syncStatus: syncStatus,
syncProperties: syncProperties,
apiConfig: apiConfig,
LbrycrdDsn: lbrycrdDsn,
AwsConfigs: awsConfigs,
ApiConfig: apiConfig,
}
}
const (
StatusPending = "pending" // waiting for permission to sync
StatusPendingEmail = "pendingemail" // permission granted but missing email
StatusQueued = "queued" // in sync queue. will be synced soon
StatusPendingUpgrade = "pendingupgrade" // in sync queue. will be synced soon
StatusSyncing = "syncing" // syncing now
StatusSynced = "synced" // done
StatusFailed = "failed"
StatusFinalized = "finalized" // no more changes allowed
StatusAbandoned = "abandoned" // deleted on youtube or banned
)
const LatestMetadataVersion = 2
var SyncStatuses = []string{StatusPending, StatusPendingEmail, StatusPendingUpgrade, StatusQueued, StatusSyncing, StatusSynced, StatusFailed, StatusFinalized, StatusAbandoned}
const (
VideoStatusPublished = "published"
VideoStatusFailed = "failed"
VideoStatusUpgradeFailed = "upgradefailed"
VideoStatusUnpublished = "unpublished"
VideoStatusTranferFailed = "transferfailed"
)
const (
TransferStateNotTouched = iota
TransferStatePending
TransferStateComplete
TransferStateManual
)
func (s *SyncManager) enqueueChannel(channel *shared.YoutubeChannel) {
s.channelsToSync = append(s.channelsToSync, Sync{
DbChannelData: channel,
Manager: s,
namer: namer.NewNamer(),
})
}
func (s *SyncManager) Start() error {
if logUtils.ShouldCleanOnStartup() {
err := logUtils.CleanForStartup()
if err != nil {
@ -108,119 +57,63 @@ func (s *SyncManager) Start() error {
var lastChannelProcessed string
syncCount := 0
for {
s.channelsToSync = make([]Sync, 0, 10) // reset sync queue
err := s.checkUsedSpace()
if err != nil {
return errors.Err(err)
}
var syncs []Sync
shouldInterruptLoop := false
isSingleChannelSync := s.syncProperties.YoutubeChannelID != ""
if isSingleChannelSync {
channels, err := s.apiConfig.FetchChannels("", s.syncProperties)
if s.CliFlags.IsSingleChannelSync() {
channels, err := s.ApiConfig.FetchChannels("", &s.CliFlags)
if err != nil {
return errors.Err(err)
}
if len(channels) != 1 {
return errors.Err("Expected 1 channel, %d returned", len(channels))
}
lbryChannelName := channels[0].DesiredChannelName
syncs = make([]Sync, 1)
s.maxVideoLength = time.Duration(channels[0].LengthLimit) * time.Minute
s.maxVideoSize = channels[0].SizeLimit
syncs[0] = Sync{
APIConfig: s.apiConfig,
YoutubeChannelID: s.syncProperties.YoutubeChannelID,
LbryChannelName: lbryChannelName,
lbryChannelID: channels[0].ChannelClaimID,
MaxTries: s.maxTries,
ConcurrentVideos: s.concurrentVideos,
Refill: s.refill,
Manager: s,
MaxVideoLength: s.maxVideoLength,
LbrycrdString: s.lbrycrdString,
AwsS3ID: s.awsS3ID,
AwsS3Secret: s.awsS3Secret,
AwsS3Region: s.awsS3Region,
AwsS3Bucket: s.awsS3Bucket,
namer: namer.NewNamer(),
Fee: channels[0].Fee,
clientPublishAddress: channels[0].PublishAddress,
publicKey: channels[0].PublicKey,
transferState: channels[0].TransferState,
LastUploadedVideo: channels[0].LastUploadedVideo,
}
s.enqueueChannel(&channels[0])
shouldInterruptLoop = true
} else {
var queuesToSync []string
if s.syncStatus != "" {
queuesToSync = append(queuesToSync, s.syncStatus)
} else if s.SyncFlags.SyncUpdate {
queuesToSync = append(queuesToSync, StatusSyncing, StatusSynced)
if s.CliFlags.SyncStatus != "" {
queuesToSync = append(queuesToSync, s.CliFlags.SyncStatus)
} else if s.CliFlags.SyncUpdate {
queuesToSync = append(queuesToSync, shared.StatusSyncing, shared.StatusSynced)
} else {
queuesToSync = append(queuesToSync, StatusSyncing, StatusQueued)
queuesToSync = append(queuesToSync, shared.StatusSyncing, shared.StatusQueued)
}
queues:
for _, q := range queuesToSync {
//temporary override for sync-until to give tom the time to review the channels
if q == StatusQueued {
s.syncProperties.SyncUntil = time.Now().Add(-8 * time.Hour).Unix()
}
channels, err := s.apiConfig.FetchChannels(q, s.syncProperties)
channels, err := s.ApiConfig.FetchChannels(q, &s.CliFlags)
if err != nil {
return err
}
for i, c := range channels {
log.Infof("There are %d channels in the \"%s\" queue", len(channels)-i, q)
maxVideoLength := s.maxVideoLength
if c.TotalSubscribers < 1000 {
maxVideoLength = 1 * time.Hour
}
maxVideoLength = time.Duration(c.LengthLimit) * time.Minute
s.maxVideoSize = c.SizeLimit
syncs = append(syncs, Sync{
APIConfig: s.apiConfig,
YoutubeChannelID: c.ChannelId,
LbryChannelName: c.DesiredChannelName,
lbryChannelID: c.ChannelClaimID,
MaxTries: s.maxTries,
ConcurrentVideos: s.concurrentVideos,
MaxVideoLength: maxVideoLength,
Refill: s.refill,
Manager: s,
LbrycrdString: s.lbrycrdString,
AwsS3ID: s.awsS3ID,
AwsS3Secret: s.awsS3Secret,
AwsS3Region: s.awsS3Region,
AwsS3Bucket: s.awsS3Bucket,
namer: namer.NewNamer(),
Fee: c.Fee,
clientPublishAddress: c.PublishAddress,
publicKey: c.PublicKey,
transferState: c.TransferState,
LastUploadedVideo: c.LastUploadedVideo,
})
if q != StatusFailed {
continue queues
log.Infof("Currently processing the \"%s\" queue with %d channels", q, len(channels))
for _, c := range channels {
s.enqueueChannel(&c)
queueAll := q == shared.StatusFailed || q == shared.StatusSyncing
if !queueAll {
break queues
}
}
log.Infof("Drained the \"%s\" queue", q)
}
}
if len(syncs) == 0 {
if len(s.channelsToSync) == 0 {
log.Infoln("No channels to sync. Pausing 5 minutes!")
time.Sleep(5 * time.Minute)
}
for _, sync := range syncs {
if lastChannelProcessed == sync.LbryChannelName {
util.SendToSlack("We just killed a sync for %s to stop looping!(%s)", sync.LbryChannelName, sync.YoutubeChannelID)
stopTheLoops := errors.Err("Found channel %s running twice, set it to failed, and reprocess later", sync.LbryChannelName)
for _, sync := range s.channelsToSync {
if lastChannelProcessed == sync.DbChannelData.ChannelId {
util.SendToSlack("We just killed a sync for %s to stop looping! (%s)", sync.DbChannelData.DesiredChannelName, sync.DbChannelData.ChannelId)
stopTheLoops := errors.Err("Found channel %s running twice, set it to failed, and reprocess later", sync.DbChannelData.DesiredChannelName)
sync.setChannelTerminationStatus(&stopTheLoops)
continue
}
lastChannelProcessed = sync.LbryChannelName
lastChannelProcessed = sync.DbChannelData.ChannelId
shouldNotCount := false
logUtils.SendInfoToSlack("Syncing %s (%s) to LBRY! total processed channels since startup: %d", sync.LbryChannelName, sync.YoutubeChannelID, syncCount+1)
logUtils.SendInfoToSlack("Syncing %s (%s) to LBRY! total processed channels since startup: %d", sync.DbChannelData.DesiredChannelName, sync.DbChannelData.ChannelId, syncCount+1)
err := sync.FullCycle()
//TODO: THIS IS A TEMPORARY WORK AROUND FOR THE STUPID IP LOCKUP BUG
ipPool, _ := ip_manager.GetIPPool(sync.grp)
@ -255,33 +148,28 @@ func (s *SyncManager) Start() error {
if err != nil {
return errors.Prefix("@Nikooo777 something went wrong while reflecting blobs", err)
}
logUtils.SendInfoToSlack("Syncing %s (%s) reached an end. total processed channels since startup: %d", sync.LbryChannelName, sync.YoutubeChannelID, syncCount+1)
logUtils.SendInfoToSlack("%s (%s) reached an end. Total processed channels since startup: %d", sync.DbChannelData.DesiredChannelName, sync.DbChannelData.ChannelId, syncCount+1)
if !shouldNotCount {
syncCount++
}
if sync.IsInterrupted() || (s.limit != 0 && syncCount >= s.limit) {
if sync.IsInterrupted() || (s.CliFlags.Limit != 0 && syncCount >= s.CliFlags.Limit) {
shouldInterruptLoop = true
break
}
}
if shouldInterruptLoop || s.SyncFlags.SingleRun {
if shouldInterruptLoop || s.CliFlags.SingleRun {
break
}
}
return nil
}
func (s *SyncManager) GetS3AWSConfig() aws.Config {
return aws.Config{
Credentials: credentials.NewStaticCredentials(s.awsS3ID, s.awsS3Secret, ""),
Region: &s.awsS3Region,
}
}
func (s *SyncManager) checkUsedSpace() error {
usedPctile, err := GetUsedSpace(logUtils.GetBlobsDir())
if err != nil {
return errors.Err(err)
}
if usedPctile >= 0.90 && !s.SyncFlags.SkipSpaceCheck {
if usedPctile >= 0.90 && !s.CliFlags.SkipSpaceCheck {
return errors.Err(fmt.Sprintf("more than 90%% of the space has been used. use --skip-space-check to ignore. Used: %.1f%%", usedPctile*100))
}
log.Infof("disk usage: %.1f%%", usedPctile*100)

View file

@ -6,7 +6,6 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
@ -17,8 +16,7 @@ import (
)
func (s *Sync) getS3Downloader() (*s3manager.Downloader, error) {
creds := credentials.NewStaticCredentials(s.AwsS3ID, s.AwsS3Secret, "")
s3Session, err := session.NewSession(&aws.Config{Region: aws.String(s.AwsS3Region), Credentials: creds})
s3Session, err := session.NewSession(s.Manager.AwsConfigs.GetS3AWSConfig())
if err != nil {
return nil, errors.Prefix("error starting session: ", err)
}
@ -26,8 +24,7 @@ func (s *Sync) getS3Downloader() (*s3manager.Downloader, error) {
return downloader, nil
}
func (s *Sync) getS3Uploader() (*s3manager.Uploader, error) {
creds := credentials.NewStaticCredentials(s.AwsS3ID, s.AwsS3Secret, "")
s3Session, err := session.NewSession(&aws.Config{Region: aws.String(s.AwsS3Region), Credentials: creds})
s3Session, err := session.NewSession(s.Manager.AwsConfigs.GetS3AWSConfig())
if err != nil {
return nil, errors.Prefix("error starting session: ", err)
}
@ -51,7 +48,7 @@ func (s *Sync) downloadWallet() error {
defer out.Close()
bytesWritten, err := downloader.Download(out, &s3.GetObjectInput{
Bucket: aws.String(s.AwsS3Bucket),
Bucket: aws.String(s.Manager.AwsConfigs.AwsS3Bucket),
Key: key,
})
if err != nil {
@ -112,7 +109,7 @@ func (s *Sync) downloadBlockchainDB() error {
defer out.Close()
bytesWritten, err := downloader.Download(out, &s3.GetObjectInput{
Bucket: aws.String(s.AwsS3Bucket),
Bucket: aws.String(s.Manager.AwsConfigs.AwsS3Bucket),
Key: key,
})
if err != nil {
@ -146,11 +143,11 @@ func (s *Sync) downloadBlockchainDB() error {
func (s *Sync) getWalletPaths() (defaultWallet, tempWallet string, key *string, err error) {
defaultWallet = os.Getenv("HOME") + "/.lbryum/wallets/default_wallet"
tempWallet = os.Getenv("HOME") + "/.lbryum/wallets/tmp_wallet"
key = aws.String("/wallets/" + s.YoutubeChannelID)
key = aws.String("/wallets/" + s.DbChannelData.ChannelId)
if logUtils.IsRegTest() {
defaultWallet = os.Getenv("HOME") + "/.lbryum_regtest/wallets/default_wallet"
tempWallet = os.Getenv("HOME") + "/.lbryum_regtest/wallets/tmp_wallet"
key = aws.String("/regtest/" + s.YoutubeChannelID)
key = aws.String("/regtest/" + s.DbChannelData.ChannelId)
}
lbryumDir := os.Getenv("LBRYUM_DIR")
@ -176,20 +173,20 @@ func (s *Sync) getBlockchainDBPaths() (defaultDB, tempDB string, key *string, er
}
defaultDB = lbryumDir + "/lbc_mainnet/blockchain.db"
tempDB = lbryumDir + "/lbc_mainnet/tmp_blockchain.db"
key = aws.String("/blockchain_dbs/" + s.YoutubeChannelID)
key = aws.String("/blockchain_dbs/" + s.DbChannelData.ChannelId)
if logUtils.IsRegTest() {
defaultDB = lbryumDir + "/lbc_regtest/blockchain.db"
tempDB = lbryumDir + "/lbc_regtest/tmp_blockchain.db"
key = aws.String("/regtest_dbs/" + s.YoutubeChannelID)
key = aws.String("/regtest_dbs/" + s.DbChannelData.ChannelId)
}
return
}
func (s *Sync) uploadWallet() error {
defaultWalletDir := logUtils.GetDefaultWalletPath()
key := aws.String("/wallets/" + s.YoutubeChannelID)
key := aws.String("/wallets/" + s.DbChannelData.ChannelId)
if logUtils.IsRegTest() {
key = aws.String("/regtest/" + s.YoutubeChannelID)
key = aws.String("/regtest/" + s.DbChannelData.ChannelId)
}
if _, err := os.Stat(defaultWalletDir); os.IsNotExist(err) {
@ -208,7 +205,7 @@ func (s *Sync) uploadWallet() error {
defer file.Close()
_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(s.AwsS3Bucket),
Bucket: aws.String(s.Manager.AwsConfigs.AwsS3Bucket),
Key: key,
Body: file,
})
@ -241,7 +238,7 @@ func (s *Sync) uploadBlockchainDB() error {
defer file.Close()
_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(s.AwsS3Bucket),
Bucket: aws.String(s.Manager.AwsConfigs.AwsS3Bucket),
Key: key,
Body: file,
})

View file

@ -9,6 +9,7 @@ import (
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/jsonrpc"
"github.com/lbryio/lbry.go/v2/extras/util"
"github.com/lbryio/ytsync/v5/shared"
"github.com/lbryio/ytsync/v5/timing"
logUtils "github.com/lbryio/ytsync/v5/util"
"github.com/lbryio/ytsync/v5/ytapi"
@ -72,7 +73,7 @@ func (s *Sync) walletSetup() error {
}
log.Debugf("Starting balance is %.4f", balance)
videosOnYoutube, err := ytapi.CountVideosInChannel(s.YoutubeChannelID)
videosOnYoutube, err := ytapi.CountVideosInChannel(s.DbChannelData.ChannelId)
if err != nil {
return err
}
@ -100,17 +101,17 @@ func (s *Sync) walletSetup() error {
log.Debugf("We already allocated credits for %d published videos and %d failed videos", publishedCount, failedCount)
if videosOnYoutube > s.Manager.videosLimit {
videosOnYoutube = s.Manager.videosLimit
if videosOnYoutube > s.Manager.CliFlags.VideosLimit {
videosOnYoutube = s.Manager.CliFlags.VideosLimit
}
unallocatedVideos := videosOnYoutube - (publishedCount + failedCount)
channelFee := channelClaimAmount
channelAlreadyClaimed := s.lbryChannelID != ""
channelAlreadyClaimed := s.DbChannelData.ChannelClaimID != ""
if channelAlreadyClaimed {
channelFee = 0.0
}
requiredBalance := float64(unallocatedVideos)*(publishAmount+estimatedMaxTxFee) + channelFee
if s.Manager.SyncFlags.UpgradeMetadata {
if s.Manager.CliFlags.UpgradeMetadata {
requiredBalance += float64(notUpgradedCount) * 0.001
}
@ -119,8 +120,8 @@ func (s *Sync) walletSetup() error {
refillAmount = math.Max(math.Max(requiredBalance-balance, minimumAccountBalance-balance), minimumRefillAmount)
}
if s.Refill > 0 {
refillAmount += float64(s.Refill)
if s.Manager.CliFlags.Refill > 0 {
refillAmount += float64(s.Manager.CliFlags.Refill)
}
if refillAmount > 0 {
@ -136,12 +137,11 @@ func (s *Sync) walletSetup() error {
} else if claimAddress == nil {
return errors.Err("could not get an address")
}
s.claimAddress = string(claimAddress.Items[0].Address)
if s.claimAddress == "" {
return errors.Err("found blank claim address")
if s.DbChannelData.PublishAddress == "" || !s.shouldTransfer() {
s.DbChannelData.PublishAddress = string(claimAddress.Items[0].Address)
}
if s.shouldTransfer() {
s.claimAddress = s.clientPublishAddress
if s.DbChannelData.PublishAddress == "" {
return errors.Err("found blank claim address")
}
err = s.ensureEnoughUTXOs()
@ -300,7 +300,7 @@ func (s *Sync) waitForNewBlock() error {
}
func (s *Sync) GenerateRegtestBlock() error {
lbrycrd, err := logUtils.GetLbrycrdClient(s.LbrycrdString)
lbrycrd, err := logUtils.GetLbrycrdClient(s.Manager.LbrycrdDsn)
if err != nil {
return errors.Prefix("error getting lbrycrd client: ", err)
}
@ -319,7 +319,7 @@ func (s *Sync) GenerateRegtestBlock() error {
func (s *Sync) ensureChannelOwnership() error {
defer func(start time.Time) { timing.TimedComponent("ensureChannelOwnership").Add(time.Since(start)) }(time.Now())
if s.LbryChannelName == "" {
if s.DbChannelData.DesiredChannelName == "" {
return errors.Err("no channel name set")
}
@ -332,27 +332,27 @@ func (s *Sync) ensureChannelOwnership() error {
var channelToUse *jsonrpc.Transaction
if len((*channels).Items) > 0 {
if s.lbryChannelID == "" {
if s.DbChannelData.ChannelClaimID == "" {
return errors.Err("this channel does not have a recorded claimID in the database. To prevent failures, updates are not supported until an entry is manually added in the database")
}
for _, c := range (*channels).Items {
log.Debugf("checking listed channel %s (%s)", c.ClaimID, c.Name)
if c.ClaimID != s.lbryChannelID {
if c.ClaimID != s.DbChannelData.ChannelClaimID {
continue
}
if c.Name != s.LbryChannelName {
if c.Name != s.DbChannelData.DesiredChannelName {
return errors.Err("the channel in the wallet is different than the channel in the database")
}
channelToUse = &c
break
}
if channelToUse == nil {
return errors.Err("this wallet has channels but not a single one is ours! Expected claim_id: %s (%s)", s.lbryChannelID, s.LbryChannelName)
return errors.Err("this wallet has channels but not a single one is ours! Expected claim_id: %s (%s)", s.DbChannelData.ChannelClaimID, s.DbChannelData.DesiredChannelName)
}
} else if s.transferState == TransferStateComplete {
} else if s.DbChannelData.TransferState == shared.TransferStateComplete {
return errors.Err("the channel was transferred but appears to have been abandoned!")
} else if s.lbryChannelID != "" {
return errors.Err("the database has a channel recorded (%s) but nothing was found in our control", s.lbryChannelID)
} else if s.DbChannelData.ChannelClaimID != "" {
return errors.Err("the database has a channel recorded (%s) but nothing was found in our control", s.DbChannelData.ChannelClaimID)
}
channelUsesOldMetadata := false
@ -383,20 +383,23 @@ func (s *Sync) ensureChannelOwnership() error {
}
}
channelInfo, err := ytapi.ChannelInfo(s.YoutubeChannelID)
channelInfo, err := ytapi.ChannelInfo(s.DbChannelData.ChannelId)
if err != nil {
return err
}
thumbnail := channelInfo.Header.C4TabbedHeaderRenderer.Avatar.Thumbnails[len(channelInfo.Header.C4TabbedHeaderRenderer.Avatar.Thumbnails)-1].URL
thumbnailURL, err := thumbs.MirrorThumbnail(thumbnail, s.YoutubeChannelID, s.Manager.GetS3AWSConfig())
thumbnailURL, err := thumbs.MirrorThumbnail(thumbnail, s.DbChannelData.ChannelId, *s.Manager.AwsConfigs.GetS3AWSConfig())
if err != nil {
return err
}
var bannerURL *string
if channelInfo.Header.C4TabbedHeaderRenderer.Banner.Thumbnails != nil {
bURL, err := thumbs.MirrorThumbnail(channelInfo.Header.C4TabbedHeaderRenderer.Banner.Thumbnails[len(channelInfo.Header.C4TabbedHeaderRenderer.Banner.Thumbnails)-1].URL, "banner-"+s.YoutubeChannelID, s.Manager.GetS3AWSConfig())
bURL, err := thumbs.MirrorThumbnail(channelInfo.Header.C4TabbedHeaderRenderer.Banner.Thumbnails[len(channelInfo.Header.C4TabbedHeaderRenderer.Banner.Thumbnails)-1].URL,
"banner-"+s.DbChannelData.ChannelId,
*s.Manager.AwsConfigs.GetS3AWSConfig(),
)
if err != nil {
return err
}
@ -419,14 +422,14 @@ func (s *Sync) ensureChannelOwnership() error {
claimCreateOptions := jsonrpc.ClaimCreateOptions{
Title: &channelInfo.Microformat.MicroformatDataRenderer.Title,
Description: &channelInfo.Microformat.MicroformatDataRenderer.Description,
Tags: tags_manager.GetTagsForChannel(s.YoutubeChannelID),
Tags: tags_manager.GetTagsForChannel(s.DbChannelData.ChannelId),
Languages: languages,
Locations: locations,
ThumbnailURL: &thumbnailURL,
}
if channelUsesOldMetadata {
if s.transferState <= 1 {
c, err = s.daemon.ChannelUpdate(s.lbryChannelID, jsonrpc.ChannelUpdateOptions{
if s.DbChannelData.TransferState <= 1 {
c, err = s.daemon.ChannelUpdate(s.DbChannelData.ChannelClaimID, jsonrpc.ChannelUpdateOptions{
ClearTags: util.PtrToBool(true),
ClearLocations: util.PtrToBool(true),
ClearLanguages: util.PtrToBool(true),
@ -436,11 +439,11 @@ func (s *Sync) ensureChannelOwnership() error {
},
})
} else {
logUtils.SendInfoToSlack("%s (%s) has a channel with old metadata but isn't in our control anymore. Ignoring", s.LbryChannelName, s.lbryChannelID)
logUtils.SendInfoToSlack("%s (%s) has a channel with old metadata but isn't in our control anymore. Ignoring", s.DbChannelData.DesiredChannelName, s.DbChannelData.ChannelClaimID)
return nil
}
} else {
c, err = s.daemon.ChannelCreate(s.LbryChannelName, channelBidAmount, jsonrpc.ChannelCreateOptions{
c, err = s.daemon.ChannelCreate(s.DbChannelData.DesiredChannelName, channelBidAmount, jsonrpc.ChannelCreateOptions{
ClaimCreateOptions: claimCreateOptions,
CoverURL: bannerURL,
})
@ -450,8 +453,8 @@ func (s *Sync) ensureChannelOwnership() error {
return err
}
s.lbryChannelID = c.Outputs[0].ClaimID
return s.Manager.apiConfig.SetChannelClaimID(s.YoutubeChannelID, s.lbryChannelID)
s.DbChannelData.ChannelClaimID = c.Outputs[0].ClaimID
return s.Manager.ApiConfig.SetChannelClaimID(s.DbChannelData.ChannelId, s.DbChannelData.ChannelClaimID)
}
func (s *Sync) addCredits(amountToAdd float64) error {
@ -460,7 +463,7 @@ func (s *Sync) addCredits(amountToAdd float64) error {
timing.TimedComponent("addCredits").Add(time.Since(start))
}(start)
log.Printf("Adding %f credits", amountToAdd)
lbrycrdd, err := logUtils.GetLbrycrdClient(s.LbrycrdString)
lbrycrdd, err := logUtils.GetLbrycrdClient(s.Manager.LbrycrdDsn)
if err != nil {
return err
}

View file

@ -10,7 +10,7 @@ import (
"github.com/lbryio/lbry.go/v2/extras/jsonrpc"
"github.com/lbryio/lbry.go/v2/extras/stop"
"github.com/lbryio/lbry.go/v2/extras/util"
"github.com/lbryio/ytsync/v5/sdk"
"github.com/lbryio/ytsync/v5/shared"
"github.com/lbryio/ytsync/v5/timing"
log "github.com/sirupsen/logrus"
@ -97,7 +97,7 @@ func abandonSupports(s *Sync) (float64, error) {
//TODO: remove this once the SDK team fixes their RPC bugs....
s.daemon.SetRPCTimeout(60 * time.Second)
defer s.daemon.SetRPCTimeout(5 * time.Minute)
for i := 0; i < s.ConcurrentVideos; i++ {
for i := 0; i < s.Manager.CliFlags.ConcurrentJobs; i++ {
consumerWG.Add(1)
go func() {
defer consumerWG.Done()
@ -189,7 +189,7 @@ func abandonSupports(s *Sync) (float64, error) {
type updateInfo struct {
ClaimID string
streamUpdateOptions *jsonrpc.StreamUpdateOptions
videoStatus *sdk.VideoStatus
videoStatus *shared.VideoStatus
}
func transferVideos(s *Sync) error {
@ -199,7 +199,7 @@ func transferVideos(s *Sync) error {
}(start)
cleanTransfer := true
streamChan := make(chan updateInfo, s.ConcurrentVideos)
streamChan := make(chan updateInfo, s.Manager.CliFlags.ConcurrentJobs)
account, err := s.getDefaultAccount()
if err != nil {
return err
@ -213,13 +213,13 @@ func transferVideos(s *Sync) error {
go func() {
defer producerWG.Done()
for _, video := range s.syncedVideos {
if !video.Published || video.Transferred || video.MetadataVersion != LatestMetadataVersion {
if !video.Published || video.Transferred || video.MetadataVersion != shared.LatestMetadataVersion {
continue
}
var stream *jsonrpc.Claim = nil
for _, c := range streams.Items {
if c.ClaimID != video.ClaimID || (c.SigningChannel != nil && c.SigningChannel.ClaimID != s.lbryChannelID) {
if c.ClaimID != video.ClaimID || (c.SigningChannel != nil && c.SigningChannel.ClaimID != s.DbChannelData.ChannelClaimID) {
continue
}
stream = &c
@ -232,7 +232,7 @@ func transferVideos(s *Sync) error {
streamUpdateOptions := jsonrpc.StreamUpdateOptions{
StreamCreateOptions: &jsonrpc.StreamCreateOptions{
ClaimCreateOptions: jsonrpc.ClaimCreateOptions{
ClaimAddress: &s.clientPublishAddress,
ClaimAddress: &s.DbChannelData.PublishAddress,
FundingAccountIDs: []string{
account,
},
@ -240,12 +240,12 @@ func transferVideos(s *Sync) error {
},
Bid: util.PtrToString("0.005"), // Todo - Dont hardcode
}
videoStatus := sdk.VideoStatus{
ChannelID: s.YoutubeChannelID,
videoStatus := shared.VideoStatus{
ChannelID: s.DbChannelData.ChannelId,
VideoID: video.VideoID,
ClaimID: video.ClaimID,
ClaimName: video.ClaimName,
Status: VideoStatusPublished,
Status: shared.VideoStatusPublished,
IsTransferred: util.PtrToBool(true),
}
streamChan <- updateInfo{
@ -257,7 +257,7 @@ func transferVideos(s *Sync) error {
}()
consumerWG := &stop.Group{}
for i := 0; i < s.ConcurrentVideos; i++ {
for i := 0; i < s.Manager.CliFlags.ConcurrentJobs; i++ {
consumerWG.Add(1)
go func(worker int) {
defer consumerWG.Done()
@ -290,13 +290,13 @@ func (s *Sync) streamUpdate(ui *updateInfo) error {
timing.TimedComponent("transferStreamUpdate").Add(time.Since(start))
if updateError != nil {
ui.videoStatus.FailureReason = updateError.Error()
ui.videoStatus.Status = VideoStatusTranferFailed
ui.videoStatus.Status = shared.VideoStatusTranferFailed
ui.videoStatus.IsTransferred = util.PtrToBool(false)
} else {
ui.videoStatus.IsTransferred = util.PtrToBool(len(result.Outputs) != 0)
}
log.Infof("TRANSFERRED %t", *ui.videoStatus.IsTransferred)
statusErr := s.APIConfig.MarkVideoStatus(*ui.videoStatus)
statusErr := s.Manager.ApiConfig.MarkVideoStatus(*ui.videoStatus)
if statusErr != nil {
return errors.Prefix(statusErr.Error(), updateError)
}
@ -318,7 +318,7 @@ func transferChannel(s *Sync) error {
}
var channelClaim *jsonrpc.Transaction = nil
for _, c := range channelClaims.Items {
if c.ClaimID != s.lbryChannelID {
if c.ClaimID != s.DbChannelData.ChannelClaimID {
continue
}
channelClaim = &c
@ -332,11 +332,11 @@ func transferChannel(s *Sync) error {
Bid: util.PtrToString(fmt.Sprintf("%.6f", channelClaimAmount-0.005)),
ChannelCreateOptions: jsonrpc.ChannelCreateOptions{
ClaimCreateOptions: jsonrpc.ClaimCreateOptions{
ClaimAddress: &s.clientPublishAddress,
ClaimAddress: &s.DbChannelData.PublishAddress,
},
},
}
result, err := s.daemon.ChannelUpdate(s.lbryChannelID, updateOptions)
result, err := s.daemon.ChannelUpdate(s.DbChannelData.ChannelClaimID, updateOptions)
if err != nil {
return errors.Err(err)
}

View file

@ -15,6 +15,7 @@ import (
"github.com/lbryio/ytsync/v5/ip_manager"
"github.com/lbryio/ytsync/v5/namer"
"github.com/lbryio/ytsync/v5/sdk"
"github.com/lbryio/ytsync/v5/shared"
"github.com/lbryio/ytsync/v5/sources"
"github.com/lbryio/ytsync/v5/thumbs"
"github.com/lbryio/ytsync/v5/timing"
@ -40,35 +41,18 @@ const (
// Sync stores the options that control how syncing happens
type Sync struct {
APIConfig *sdk.APIConfig
YoutubeChannelID string
LbryChannelName string
MaxTries int
ConcurrentVideos int
Refill int
DbChannelData *shared.YoutubeChannel
Manager *SyncManager
LbrycrdString string
AwsS3ID string
AwsS3Secret string
AwsS3Region string
AwsS3Bucket string
Fee *sdk.Fee
daemon *jsonrpc.Client
claimAddress string
videoDirectory string
syncedVideosMux *sync.RWMutex
syncedVideos map[string]sdk.SyncedVideo
grp *stop.Group
lbryChannelID string
namer *namer.Namer
walletMux *sync.RWMutex
queue chan ytapi.Video
transferState int
clientPublishAddress string
publicKey string
defaultAccountID string
MaxVideoLength time.Duration
LastUploadedVideo string
}
func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string, claimName string, claimID string, metadataVersion int8, size int64) {
@ -96,7 +80,7 @@ func (s *Sync) IsInterrupted() bool {
}
func (s *Sync) setStatusSyncing() error {
syncedVideos, claimNames, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusSyncing, "", nil)
syncedVideos, claimNames, err := s.Manager.ApiConfig.SetChannelStatus(s.DbChannelData.ChannelId, shared.StatusSyncing, "", nil)
if err != nil {
return err
}
@ -107,24 +91,12 @@ func (s *Sync) setStatusSyncing() error {
return nil
}
func (s *Sync) setExceptions() {
if s.YoutubeChannelID == "UCwjQfNRW6sGYb__pd7d4nUg" { //@FreeTalkLive
s.MaxVideoLength = 9999 * time.Hour // skips max length checks
s.Manager.maxVideoSize = 0
}
}
var stopGroup = stop.New()
func (s *Sync) FullCycle() (e error) {
if os.Getenv("HOME") == "" {
return errors.Err("no $HOME env var found")
}
if s.YoutubeChannelID == "" {
return errors.Err("channel ID not provided")
}
s.setExceptions()
defer timing.ClearTimings()
s.syncedVideosMux = &sync.RWMutex{}
s.walletMux = &sync.RWMutex{}
@ -201,7 +173,7 @@ func (s *Sync) FullCycle() (e error) {
func (s *Sync) processTransfers() (e error) {
log.Println("Processing transfers")
if s.transferState != 2 {
if s.DbChannelData.TransferState != 2 {
err := waitConfirmations(s)
if err != nil {
return err
@ -212,7 +184,7 @@ func (s *Sync) processTransfers() (e error) {
return errors.Prefix(fmt.Sprintf("%.6f LBCs were abandoned before failing", supportAmount), err)
}
if supportAmount > 0 {
logUtils.SendInfoToSlack("(%s) %.6f LBCs were abandoned and should be used as support", s.YoutubeChannelID, supportAmount)
logUtils.SendInfoToSlack("(%s) %.6f LBCs were abandoned and should be used as support", s.DbChannelData.ChannelId, supportAmount)
}
err = transferVideos(s)
if err != nil {
@ -233,14 +205,14 @@ func (s *Sync) processTransfers() (e error) {
return err
}
isTip := true
summary, err := s.daemon.SupportCreate(s.lbryChannelID, fmt.Sprintf("%.6f", supportAmount), &isTip, nil, []string{defaultAccount}, nil)
summary, err := s.daemon.SupportCreate(s.DbChannelData.ChannelClaimID, fmt.Sprintf("%.6f", supportAmount), &isTip, nil, []string{defaultAccount}, nil)
if err != nil {
if strings.Contains(err.Error(), "tx-size") { //TODO: this is a silly workaround and should be written in an recursive function
summary, err = s.daemon.SupportCreate(s.lbryChannelID, fmt.Sprintf("%.6f", supportAmount/2.0), &isTip, nil, []string{defaultAccount}, nil)
summary, err = s.daemon.SupportCreate(s.DbChannelData.ChannelClaimID, fmt.Sprintf("%.6f", supportAmount/2.0), &isTip, nil, []string{defaultAccount}, nil)
if err != nil {
return errors.Prefix(fmt.Sprintf("something went wrong while tipping the channel for %.6f LBCs", supportAmount), err)
}
summary, err = s.daemon.SupportCreate(s.lbryChannelID, fmt.Sprintf("%.6f", supportAmount/2.0), &isTip, nil, []string{defaultAccount}, nil)
summary, err = s.daemon.SupportCreate(s.DbChannelData.ChannelClaimID, fmt.Sprintf("%.6f", supportAmount/2.0), &isTip, nil, []string{defaultAccount}, nil)
if err != nil {
return errors.Err(err)
}
@ -267,7 +239,7 @@ func deleteSyncFolder(videoDirectory string) {
}
func (s *Sync) shouldTransfer() bool {
return s.transferState >= 1 && s.clientPublishAddress != "" && !s.Manager.SyncFlags.DisableTransfers
return s.DbChannelData.TransferState >= 1 && s.DbChannelData.PublishAddress != "" && !s.Manager.CliFlags.DisableTransfers
}
func (s *Sync) setChannelTerminationStatus(e *error) {
@ -275,7 +247,7 @@ func (s *Sync) setChannelTerminationStatus(e *error) {
if s.shouldTransfer() {
if *e == nil {
transferState = util.PtrToInt(TransferStateComplete)
transferState = util.PtrToInt(shared.TransferStateComplete)
}
}
if *e != nil {
@ -288,13 +260,13 @@ func (s *Sync) setChannelTerminationStatus(e *error) {
return
}
failureReason := (*e).Error()
_, _, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusFailed, failureReason, transferState)
_, _, err := s.Manager.ApiConfig.SetChannelStatus(s.DbChannelData.ChannelId, shared.StatusFailed, failureReason, transferState)
if err != nil {
msg := fmt.Sprintf("Failed setting failed state for channel %s", s.LbryChannelName)
msg := fmt.Sprintf("Failed setting failed state for channel %s", s.DbChannelData.DesiredChannelName)
*e = errors.Prefix(msg+err.Error(), *e)
}
} else if !s.IsInterrupted() {
_, _, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusSynced, "", transferState)
_, _, err := s.Manager.ApiConfig.SetChannelStatus(s.DbChannelData.ChannelId, shared.StatusSynced, "", transferState)
if err != nil {
*e = err
}
@ -397,7 +369,7 @@ func (s *Sync) fixDupes(claims []jsonrpc.Claim) (bool, error) {
abandonedClaims := false
videoIDs := make(map[string]jsonrpc.Claim)
for _, c := range claims {
if !isYtsyncClaim(c, s.lbryChannelID) {
if !isYtsyncClaim(c, s.DbChannelData.ChannelClaimID) {
continue
}
tn := c.Value.GetThumbnail().GetUrl()
@ -415,7 +387,7 @@ func (s *Sync) fixDupes(claims []jsonrpc.Claim) (bool, error) {
claimToAbandon = cl
videoIDs[videoID] = c
}
if claimToAbandon.Address != s.clientPublishAddress && !s.syncedVideos[videoID].Transferred {
if claimToAbandon.Address != s.DbChannelData.PublishAddress && !s.syncedVideos[videoID].Transferred {
log.Debugf("abandoning %+v", claimToAbandon)
_, err := s.daemon.StreamAbandon(claimToAbandon.Txid, claimToAbandon.Nout, nil, false)
if err != nil {
@ -444,7 +416,7 @@ type ytsyncClaim struct {
func (s *Sync) mapFromClaims(claims []jsonrpc.Claim) map[string]ytsyncClaim {
videoIDMap := make(map[string]ytsyncClaim, len(claims))
for _, c := range claims {
if !isYtsyncClaim(c, s.lbryChannelID) {
if !isYtsyncClaim(c, s.DbChannelData.ChannelClaimID) {
continue
}
tn := c.Value.GetThumbnail().GetUrl()
@ -513,10 +485,10 @@ func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim, ownClaims []jsonrpc.Claim)
}
fixed++
log.Debugf("updating %s in the database", videoID)
err = s.Manager.apiConfig.MarkVideoStatus(sdk.VideoStatus{
ChannelID: s.YoutubeChannelID,
err = s.Manager.ApiConfig.MarkVideoStatus(shared.VideoStatus{
ChannelID: s.DbChannelData.ChannelId,
VideoID: videoID,
Status: VideoStatusPublished,
Status: shared.VideoStatusPublished,
ClaimID: chainInfo.ClaimID,
ClaimName: chainInfo.ClaimName,
Size: util.PtrToInt64(int64(claimSize)),
@ -562,13 +534,13 @@ func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim, ownClaims []jsonrpc.Claim)
}
_, ok := ownClaimsInfo[vID]
if !ok && sv.Published {
log.Debugf("%s: claims to be published but wasn't found in the list of claims and will be removed if --remove-db-unpublished was specified (%t)", vID, s.Manager.SyncFlags.RemoveDBUnpublished)
log.Debugf("%s: claims to be published but wasn't found in the list of claims and will be removed if --remove-db-unpublished was specified (%t)", vID, s.Manager.CliFlags.RemoveDBUnpublished)
idsToRemove = append(idsToRemove, vID)
}
}
if s.Manager.SyncFlags.RemoveDBUnpublished && len(idsToRemove) > 0 {
if s.Manager.CliFlags.RemoveDBUnpublished && len(idsToRemove) > 0 {
log.Infof("removing: %s", strings.Join(idsToRemove, ","))
err := s.Manager.apiConfig.DeleteVideos(idsToRemove)
err := s.Manager.ApiConfig.DeleteVideos(idsToRemove)
if err != nil {
return count, fixed, len(idsToRemove), err
}
@ -599,7 +571,7 @@ func (s *Sync) getClaims(defaultOnly bool) ([]jsonrpc.Claim, error) {
}
items := make([]jsonrpc.Claim, 0, len(claims.Items))
for _, c := range claims.Items {
if c.SigningChannel != nil && c.SigningChannel.ClaimID == s.lbryChannelID {
if c.SigningChannel != nil && c.SigningChannel.ClaimID == s.DbChannelData.ChannelClaimID {
items = append(items, c)
}
}
@ -656,11 +628,11 @@ func (s *Sync) checkIntegrity() error {
}
if pubsOnWallet > pubsOnDB { //This case should never happen
logUtils.SendInfoToSlack("We're claiming to have published %d videos but in reality we published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID)
logUtils.SendInfoToSlack("We're claiming to have published %d videos but in reality we published %d (%s)", pubsOnDB, pubsOnWallet, s.DbChannelData.ChannelId)
return errors.Err("not all published videos are in the database")
}
if pubsOnWallet < pubsOnDB {
logUtils.SendInfoToSlack("we're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID)
logUtils.SendInfoToSlack("we're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.DbChannelData.ChannelId)
}
_, err = s.getUnsentSupports() //TODO: use the returned value when it works
@ -693,24 +665,24 @@ func (s *Sync) doSync() error {
return err
}
if s.transferState < TransferStateComplete {
cert, err := s.daemon.ChannelExport(s.lbryChannelID, nil, nil)
if s.DbChannelData.TransferState < shared.TransferStateComplete {
cert, err := s.daemon.ChannelExport(s.DbChannelData.ChannelClaimID, nil, nil)
if err != nil {
return errors.Prefix("error getting channel cert", err)
}
if cert != nil {
err = s.APIConfig.SetChannelCert(string(*cert), s.lbryChannelID)
err = s.Manager.ApiConfig.SetChannelCert(string(*cert), s.DbChannelData.ChannelClaimID)
if err != nil {
return errors.Prefix("error setting channel cert", err)
}
}
}
if s.Manager.SyncFlags.StopOnError {
if s.Manager.CliFlags.StopOnError {
log.Println("Will stop publishing if an error is detected")
}
for i := 0; i < s.ConcurrentVideos; i++ {
for i := 0; i < s.Manager.CliFlags.ConcurrentJobs; i++ {
s.grp.Add(1)
go func(i int) {
defer s.grp.Done()
@ -718,7 +690,7 @@ func (s *Sync) doSync() error {
}(i)
}
if s.LbryChannelName == "@UCBerkeley" {
if s.DbChannelData.DesiredChannelName == "@UCBerkeley" {
err = errors.Err("UCB is not supported in this version of YTSYNC")
} else {
err = s.enqueueYoutubeVideos()
@ -780,9 +752,9 @@ func (s *Sync) startWorker(workerNum int) {
"Couldn't find private key for id",
"You already have a stream claim published under the name",
}
if util.SubstringInSlice(err.Error(), fatalErrors) || s.Manager.SyncFlags.StopOnError {
if util.SubstringInSlice(err.Error(), fatalErrors) || s.Manager.CliFlags.StopOnError {
s.grp.Stop()
} else if s.MaxTries > 1 {
} else if s.Manager.CliFlags.MaxTries > 1 {
errorsNoRetry := []string{
"non 200 status code received",
"This video contains content from",
@ -812,7 +784,7 @@ func (s *Sync) startWorker(workerNum int) {
}
if util.SubstringInSlice(err.Error(), errorsNoRetry) {
log.Println("This error should not be retried at all")
} else if tryCount < s.MaxTries {
} else if tryCount < s.Manager.CliFlags.MaxTries {
if util.SubstringInSlice(err.Error(), []string{
"txn-mempool-conflict",
"too-long-mempool-chain",
@ -861,14 +833,14 @@ func (s *Sync) startWorker(workerNum int) {
existingClaimSize = existingClaim.Size
}
}
videoStatus := VideoStatusFailed
videoStatus := shared.VideoStatusFailed
if strings.Contains(err.Error(), "upgrade failed") {
videoStatus = VideoStatusUpgradeFailed
videoStatus = shared.VideoStatusUpgradeFailed
} else {
s.AppendSyncedVideo(v.ID(), false, err.Error(), existingClaimName, existingClaimID, 0, existingClaimSize)
}
err = s.Manager.apiConfig.MarkVideoStatus(sdk.VideoStatus{
ChannelID: s.YoutubeChannelID,
err = s.Manager.ApiConfig.MarkVideoStatus(shared.VideoStatus{
ChannelID: s.DbChannelData.ChannelId,
VideoID: v.ID(),
Status: videoStatus,
ClaimID: existingClaimID,
@ -893,12 +865,12 @@ func (s *Sync) enqueueYoutubeVideos() error {
return err
}
videos, err := ytapi.GetVideosToSync(s.APIConfig, s.YoutubeChannelID, s.syncedVideos, s.Manager.SyncFlags.QuickSync, s.Manager.videosLimit, ytapi.VideoParams{
videos, err := ytapi.GetVideosToSync(s.Manager.ApiConfig, s.DbChannelData.ChannelId, s.syncedVideos, s.Manager.CliFlags.QuickSync, s.Manager.CliFlags.VideosLimit, ytapi.VideoParams{
VideoDir: s.videoDirectory,
S3Config: s.Manager.GetS3AWSConfig(),
S3Config: *s.Manager.AwsConfigs.GetS3AWSConfig(),
Stopper: s.grp,
IPPool: ipPool,
}, s.LastUploadedVideo)
}, s.DbChannelData.LastUploadedVideo)
if err != nil {
return err
}
@ -944,7 +916,7 @@ func (s *Sync) processVideo(v ytapi.Video) (err error) {
s.syncedVideosMux.RUnlock()
newMetadataVersion := int8(2)
alreadyPublished := ok && sv.Published
videoRequiresUpgrade := ok && s.Manager.SyncFlags.UpgradeMetadata && sv.MetadataVersion < newMetadataVersion
videoRequiresUpgrade := ok && s.Manager.CliFlags.UpgradeMetadata && sv.MetadataVersion < newMetadataVersion
neverRetryFailures := []string{
"Error extracting sts from embedded url response",
@ -972,7 +944,7 @@ func (s *Sync) processVideo(v ytapi.Video) (err error) {
return nil
}
if !videoRequiresUpgrade && v.PlaylistPosition() >= s.Manager.videosLimit {
if !videoRequiresUpgrade && v.PlaylistPosition() >= s.Manager.CliFlags.VideosLimit {
log.Println(v.ID() + " is old: skipping")
return nil
}
@ -985,13 +957,13 @@ func (s *Sync) processVideo(v ytapi.Video) (err error) {
return err
}
sp := sources.SyncParams{
ClaimAddress: s.claimAddress,
ClaimAddress: s.DbChannelData.PublishAddress,
Amount: publishAmount,
ChannelID: s.lbryChannelID,
MaxVideoSize: s.Manager.maxVideoSize,
ChannelID: s.DbChannelData.ChannelClaimID,
MaxVideoSize: s.DbChannelData.SizeLimit,
Namer: s.namer,
MaxVideoLength: s.MaxVideoLength,
Fee: s.Fee,
MaxVideoLength: time.Duration(s.DbChannelData.LengthLimit) * time.Minute,
Fee: s.DbChannelData.Fee,
DefaultAccount: da,
}
@ -1001,14 +973,14 @@ func (s *Sync) processVideo(v ytapi.Video) (err error) {
}
s.AppendSyncedVideo(v.ID(), true, "", summary.ClaimName, summary.ClaimID, newMetadataVersion, *v.Size())
err = s.Manager.apiConfig.MarkVideoStatus(sdk.VideoStatus{
ChannelID: s.YoutubeChannelID,
err = s.Manager.ApiConfig.MarkVideoStatus(shared.VideoStatus{
ChannelID: s.DbChannelData.ChannelId,
VideoID: v.ID(),
Status: VideoStatusPublished,
Status: shared.VideoStatusPublished,
ClaimID: summary.ClaimID,
ClaimName: summary.ClaimName,
Size: v.Size(),
MetaDataVersion: LatestMetadataVersion,
MetaDataVersion: shared.LatestMetadataVersion,
IsTransferred: util.PtrToBool(s.shouldTransfer()),
})
if err != nil {
@ -1019,7 +991,7 @@ func (s *Sync) processVideo(v ytapi.Video) (err error) {
}
func (s *Sync) importPublicKey() error {
if s.publicKey != "" {
if s.DbChannelData.PublicKey != "" {
accountsResponse, err := s.daemon.AccountList(1, 50)
if err != nil {
return errors.Err(err)
@ -1030,13 +1002,13 @@ func (s *Sync) importPublicKey() error {
}
for _, a := range accountsResponse.Items {
if *a.Ledger == ledger {
if a.PublicKey == s.publicKey {
if a.PublicKey == s.DbChannelData.PublicKey {
return nil
}
}
}
log.Infof("Could not find public key %s in the wallet. Importing it...", s.publicKey)
_, err = s.daemon.AccountAdd(s.LbryChannelName, nil, nil, &s.publicKey, util.PtrToBool(true), nil)
log.Infof("Could not find public key %s in the wallet. Importing it...", s.DbChannelData.PublicKey)
_, err = s.daemon.AccountAdd(s.DbChannelData.DesiredChannelName, nil, nil, &s.DbChannelData.PublicKey, util.PtrToBool(true), nil)
return errors.Err(err)
}
return nil
@ -1048,7 +1020,7 @@ func (s *Sync) getUnsentSupports() (float64, error) {
if err != nil {
return 0, errors.Err(err)
}
if s.transferState == 2 {
if s.DbChannelData.TransferState == 2 {
balance, err := s.daemon.AccountBalance(&defaultAccount)
if err != nil {
return 0, err
@ -1079,8 +1051,8 @@ func (s *Sync) getUnsentSupports() (float64, error) {
}
}
}
if balanceAmount > 10 && sentSupports < 1 {
logUtils.SendErrorToSlack("(%s) this channel has quite some LBCs in it (%.2f) and %.2f LBC in sent tips, it's likely that the tips weren't actually sent or the wallet has unnecessary extra credits in it", s.YoutubeChannelID, balanceAmount, sentSupports)
if balanceAmount > 10 && sentSupports < 1 && s.DbChannelData.TransferState > 1 {
logUtils.SendErrorToSlack("(%s) this channel has quite some LBCs in it (%.2f) and %.2f LBC in sent tips, it's likely that the tips weren't actually sent or the wallet has unnecessary extra credits in it", s.DbChannelData.ChannelId, balanceAmount, sentSupports)
return balanceAmount - 10, nil
}
}

View file

@ -13,6 +13,7 @@ import (
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/null"
"github.com/lbryio/ytsync/v5/shared"
"github.com/lbryio/ytsync/v5/util"
@ -30,59 +31,21 @@ type APIConfig struct {
HostName string
}
type SyncProperties struct {
SyncFrom int64
SyncUntil int64
YoutubeChannelID string
}
type SyncFlags struct {
StopOnError bool
TakeOverExistingChannel bool
SkipSpaceCheck bool
SyncUpdate bool
SingleRun bool
RemoveDBUnpublished bool
UpgradeMetadata bool
DisableTransfers bool
QuickSync bool
}
type Fee struct {
Amount string `json:"amount"`
Address string `json:"address"`
Currency string `json:"currency"`
}
type YoutubeChannel struct {
ChannelId string `json:"channel_id"`
TotalVideos uint `json:"total_videos"`
TotalSubscribers uint `json:"total_subscribers"`
DesiredChannelName string `json:"desired_channel_name"`
Fee *Fee `json:"fee"`
ChannelClaimID string `json:"channel_claim_id"`
TransferState int `json:"transfer_state"`
PublishAddress string `json:"publish_address"`
PublicKey string `json:"public_key"`
LengthLimit int `json:"length_limit"`
SizeLimit int `json:"size_limit"`
LastUploadedVideo string `json:"last_uploaded_video"`
}
func (a *APIConfig) FetchChannels(status string, cp *SyncProperties) ([]YoutubeChannel, error) {
func (a *APIConfig) FetchChannels(status string, cliFlags *shared.SyncFlags) ([]shared.YoutubeChannel, error) {
type apiJobsResponse struct {
Success bool `json:"success"`
Error null.String `json:"error"`
Data []YoutubeChannel `json:"data"`
Data []shared.YoutubeChannel `json:"data"`
}
endpoint := a.ApiURL + "/yt/jobs"
res, err := http.PostForm(endpoint, url.Values{
"auth_token": {a.ApiToken},
"sync_status": {status},
"min_videos": {strconv.Itoa(1)},
"after": {strconv.Itoa(int(cp.SyncFrom))},
"before": {strconv.Itoa(int(cp.SyncUntil))},
"after": {strconv.Itoa(int(cliFlags.SyncFrom))},
"before": {strconv.Itoa(int(cliFlags.SyncUntil))},
"sync_server": {a.HostName},
"channel_id": {cp.YoutubeChannelID},
"channel_id": {cliFlags.ChannelID},
})
if err != nil {
return nil, errors.Err(err)
@ -93,7 +56,7 @@ func (a *APIConfig) FetchChannels(status string, cp *SyncProperties) ([]YoutubeC
util.SendErrorToSlack("Error %d while trying to call %s. Waiting to retry", res.StatusCode, endpoint)
log.Debugln(string(body))
time.Sleep(30 * time.Second)
return a.FetchChannels(status, cp)
return a.FetchChannels(status, cliFlags)
}
var response apiJobsResponse
err = json.Unmarshal(body, &response)
@ -129,7 +92,6 @@ func sanitizeFailureReason(s *string) {
}
func (a *APIConfig) SetChannelCert(certHex string, channelID string) error {
type apiSetChannelCertResponse struct {
Success bool `json:"success"`
Error null.String `json:"error"`
@ -300,19 +262,7 @@ func (a *APIConfig) DeleteVideos(videos []string) error {
return errors.Err("invalid API response. Status code: %d", res.StatusCode)
}
type VideoStatus struct {
ChannelID string
VideoID string
Status string
ClaimID string
ClaimName string
FailureReason string
Size *int64
MetaDataVersion uint
IsTransferred *bool
}
func (a *APIConfig) MarkVideoStatus(status VideoStatus) error {
func (a *APIConfig) MarkVideoStatus(status shared.VideoStatus) error {
endpoint := a.ApiURL + "/yt/video_status"
sanitizeFailureReason(&status.FailureReason)

112
shared/shared.go Normal file
View file

@ -0,0 +1,112 @@
package shared
import (
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
)
type Fee struct {
Amount string `json:"amount"`
Address string `json:"address"`
Currency string `json:"currency"`
}
type YoutubeChannel struct {
ChannelId string `json:"channel_id"`
TotalVideos uint `json:"total_videos"`
TotalSubscribers uint `json:"total_subscribers"`
DesiredChannelName string `json:"desired_channel_name"`
Fee *Fee `json:"fee"`
ChannelClaimID string `json:"channel_claim_id"`
TransferState int `json:"transfer_state"`
PublishAddress string `json:"publish_address"`
PublicKey string `json:"public_key"`
LengthLimit int `json:"length_limit"`
SizeLimit int `json:"size_limit"`
LastUploadedVideo string `json:"last_uploaded_video"`
}
type SyncFlags struct {
StopOnError bool
TakeOverExistingChannel bool
SkipSpaceCheck bool
SyncUpdate bool
SingleRun bool
RemoveDBUnpublished bool
UpgradeMetadata bool
DisableTransfers bool
QuickSync bool
MaxTries int
Refill int
Limit int
SyncStatus string
ChannelID string
SyncFrom int64
SyncUntil int64
ConcurrentJobs int
VideosLimit int
MaxVideoSize int
MaxVideoLength time.Duration
}
func (f *SyncFlags) IsSingleChannelSync() bool {
return f.ChannelID != ""
}
type VideoStatus struct {
ChannelID string
VideoID string
Status string
ClaimID string
ClaimName string
FailureReason string
Size *int64
MetaDataVersion uint
IsTransferred *bool
}
const (
StatusPending = "pending" // waiting for permission to sync
StatusPendingEmail = "pendingemail" // permission granted but missing email
StatusQueued = "queued" // in sync queue. will be synced soon
StatusPendingUpgrade = "pendingupgrade" // in sync queue. will be synced soon
StatusSyncing = "syncing" // syncing now
StatusSynced = "synced" // done
StatusFailed = "failed"
StatusFinalized = "finalized" // no more changes allowed
StatusAbandoned = "abandoned" // deleted on youtube or banned
)
var SyncStatuses = []string{StatusPending, StatusPendingEmail, StatusPendingUpgrade, StatusQueued, StatusSyncing, StatusSynced, StatusFailed, StatusFinalized, StatusAbandoned}
const LatestMetadataVersion = 2
const (
VideoStatusPublished = "published"
VideoStatusFailed = "failed"
VideoStatusUpgradeFailed = "upgradefailed"
VideoStatusUnpublished = "unpublished"
VideoStatusTranferFailed = "transferfailed"
)
const (
TransferStateNotTouched = iota
TransferStatePending
TransferStateComplete
TransferStateManual
)
type AwsConfigs struct {
AwsS3ID string
AwsS3Secret string
AwsS3Region string
AwsS3Bucket string
}
func (a *AwsConfigs) GetS3AWSConfig() *aws.Config {
return &aws.Config{
Credentials: credentials.NewStaticCredentials(a.AwsS3ID, a.AwsS3Secret, ""),
Region: &a.AwsS3Region,
}
}

View file

@ -13,6 +13,7 @@ import (
"time"
"github.com/lbryio/ytsync/v5/downloader/ytdl"
"github.com/lbryio/ytsync/v5/shared"
"github.com/lbryio/ytsync/v5/ip_manager"
"github.com/lbryio/ytsync/v5/namer"
@ -430,7 +431,7 @@ type SyncParams struct {
MaxVideoSize int
Namer *namer.Namer
MaxVideoLength time.Duration
Fee *sdk.Fee
Fee *shared.Fee
DefaultAccount string
}

View file

@ -12,6 +12,7 @@ import (
"sync"
"time"
"github.com/lbryio/ytsync/v5/shared"
"github.com/lbryio/ytsync/v5/util"
"github.com/lbryio/ytsync/v5/downloader/ytdl"
@ -208,7 +209,7 @@ func getVideos(config *sdk.APIConfig, channelID string, videoIDs []string, stopC
}
video, err := downloader.GetVideoInformation(config, videoID, stopChan, nil, ipPool)
if err != nil {
errSDK := config.MarkVideoStatus(sdk.VideoStatus{
errSDK := config.MarkVideoStatus(shared.VideoStatus{
ChannelID: channelID,
VideoID: videoID,
Status: "failed",