2019-01-11 03:02:26 +01:00
package manager
2018-07-17 18:54:22 +02:00
import (
"fmt"
2018-08-21 19:17:52 +02:00
"strings"
2018-07-17 20:58:47 +02:00
"syscall"
2018-08-21 19:17:52 +02:00
"time"
2018-07-17 20:58:47 +02:00
2019-07-10 15:46:54 +02:00
"github.com/lbryio/ytsync/blobs_reflector"
2018-10-08 22:19:17 +02:00
"github.com/lbryio/ytsync/namer"
"github.com/lbryio/ytsync/sdk"
2019-07-15 22:16:02 +02:00
logUtils "github.com/lbryio/ytsync/util"
2018-10-08 22:19:17 +02:00
2019-01-11 02:34:34 +01:00
"github.com/lbryio/lbry.go/extras/errors"
"github.com/lbryio/lbry.go/extras/util"
2019-06-06 23:25:31 +02:00
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
2018-07-17 18:54:22 +02:00
log "github.com/sirupsen/logrus"
)
type SyncManager struct {
2018-09-26 06:08:18 +02:00
stopOnError bool
maxTries int
takeOverExistingChannel bool
refill int
limit int
skipSpaceCheck bool
syncUpdate bool
concurrentJobs int
concurrentVideos int
blobsDir string
videosLimit int
maxVideoSize int
2019-01-03 19:55:27 +01:00
maxVideoLength float64
2018-09-26 06:08:18 +02:00
lbrycrdString string
awsS3ID string
awsS3Secret string
awsS3Region string
syncStatus string
awsS3Bucket string
singleRun bool
syncProperties * sdk . SyncProperties
apiConfig * sdk . APIConfig
2019-06-04 22:21:40 +02:00
removeDBUnpublished bool
2019-06-06 16:24:20 +02:00
upgradeMetadata bool
2018-09-18 21:20:34 +02:00
}
2018-09-26 06:08:18 +02:00
func NewSyncManager ( stopOnError bool , maxTries int , takeOverExistingChannel bool , refill int , limit int ,
skipSpaceCheck bool , syncUpdate bool , concurrentJobs int , concurrentVideos int , blobsDir string , videosLimit int ,
maxVideoSize int , lbrycrdString string , awsS3ID string , awsS3Secret string , awsS3Region string , awsS3Bucket string ,
2019-06-06 16:24:20 +02:00
syncStatus string , singleRun bool , syncProperties * sdk . SyncProperties , apiConfig * sdk . APIConfig , maxVideoLength float64 , removeDBUnpublished bool , upgradeMetadata bool ) * SyncManager {
2018-09-18 21:20:34 +02:00
return & SyncManager {
2018-09-26 06:08:18 +02:00
stopOnError : stopOnError ,
maxTries : maxTries ,
takeOverExistingChannel : takeOverExistingChannel ,
refill : refill ,
limit : limit ,
skipSpaceCheck : skipSpaceCheck ,
syncUpdate : syncUpdate ,
concurrentJobs : concurrentJobs ,
concurrentVideos : concurrentVideos ,
blobsDir : blobsDir ,
videosLimit : videosLimit ,
maxVideoSize : maxVideoSize ,
2019-01-03 19:55:27 +01:00
maxVideoLength : maxVideoLength ,
2018-09-26 06:08:18 +02:00
lbrycrdString : lbrycrdString ,
awsS3ID : awsS3ID ,
awsS3Secret : awsS3Secret ,
awsS3Region : awsS3Region ,
awsS3Bucket : awsS3Bucket ,
syncStatus : syncStatus ,
singleRun : singleRun ,
syncProperties : syncProperties ,
apiConfig : apiConfig ,
2019-06-04 22:21:40 +02:00
removeDBUnpublished : removeDBUnpublished ,
2019-06-06 16:24:20 +02:00
upgradeMetadata : upgradeMetadata ,
2018-09-18 21:20:34 +02:00
}
2018-07-17 18:54:22 +02:00
}
const (
2019-05-03 05:11:52 +02:00
StatusPending = "pending" // waiting for permission to sync
StatusPendingEmail = "pendingemail" // permission granted but missing email
StatusQueued = "queued" // in sync queue. will be synced soon
StatusPendingUpgrade = "pendingupgrade" // in sync queue. will be synced soon
StatusSyncing = "syncing" // syncing now
StatusSynced = "synced" // done
StatusFailed = "failed"
StatusFinalized = "finalized" // no more changes allowed
StatusAbandoned = "abandoned" // deleted on youtube or banned
2018-07-17 18:54:22 +02:00
)
2019-05-03 05:11:52 +02:00
var SyncStatuses = [ ] string { StatusPending , StatusPendingEmail , StatusPendingUpgrade , StatusQueued , StatusSyncing , StatusSynced , StatusFailed , StatusFinalized , StatusAbandoned }
2018-07-17 18:54:22 +02:00
2018-07-21 01:56:36 +02:00
const (
2019-06-10 21:59:42 +02:00
VideoStatusPublished = "published"
VideoStatusFailed = "failed"
VideoStatusUpgradeFailed = "upgradefailed"
VideoStatusUnpublished = "unpublished"
2018-07-21 01:56:36 +02:00
)
2018-08-17 16:05:54 +02:00
func ( s * SyncManager ) Start ( ) error {
2018-09-18 21:20:34 +02:00
2018-07-17 18:54:22 +02:00
syncCount := 0
for {
err := s . checkUsedSpace ( )
if err != nil {
return err
}
var syncs [ ] Sync
shouldInterruptLoop := false
2018-09-26 06:08:18 +02:00
isSingleChannelSync := s . syncProperties . YoutubeChannelID != ""
2018-07-17 18:54:22 +02:00
if isSingleChannelSync {
2018-09-26 06:08:18 +02:00
channels , err := s . apiConfig . FetchChannels ( "" , s . syncProperties )
2018-07-17 18:54:22 +02:00
if err != nil {
return err
}
if len ( channels ) != 1 {
return errors . Err ( "Expected 1 channel, %d returned" , len ( channels ) )
}
lbryChannelName := channels [ 0 ] . DesiredChannelName
syncs = make ( [ ] Sync , 1 )
syncs [ 0 ] = Sync {
2018-09-26 06:08:18 +02:00
APIConfig : s . apiConfig ,
YoutubeChannelID : s . syncProperties . YoutubeChannelID ,
2018-07-17 18:54:22 +02:00
LbryChannelName : lbryChannelName ,
2018-12-25 01:23:40 +01:00
lbryChannelID : channels [ 0 ] . ChannelClaimID ,
2018-09-26 06:08:18 +02:00
StopOnError : s . stopOnError ,
MaxTries : s . maxTries ,
ConcurrentVideos : s . concurrentVideos ,
TakeOverExistingChannel : s . takeOverExistingChannel ,
Refill : s . refill ,
2018-08-17 16:05:54 +02:00
Manager : s ,
2018-09-26 06:08:18 +02:00
LbrycrdString : s . lbrycrdString ,
AwsS3ID : s . awsS3ID ,
AwsS3Secret : s . awsS3Secret ,
AwsS3Region : s . awsS3Region ,
AwsS3Bucket : s . awsS3Bucket ,
2018-10-09 21:57:07 +02:00
namer : namer . NewNamer ( ) ,
2019-06-06 02:16:07 +02:00
Fee : channels [ 0 ] . Fee ,
2018-07-17 18:54:22 +02:00
}
shouldInterruptLoop = true
} else {
var queuesToSync [ ] string
2019-06-12 22:42:42 +02:00
//TODO: implement scrambling to avoid starvation of queues
2018-09-26 06:08:18 +02:00
if s . syncStatus != "" {
queuesToSync = append ( queuesToSync , s . syncStatus )
} else if s . syncUpdate {
2018-07-17 18:54:22 +02:00
queuesToSync = append ( queuesToSync , StatusSyncing , StatusSynced )
} else {
queuesToSync = append ( queuesToSync , StatusSyncing , StatusQueued )
}
for _ , q := range queuesToSync {
2019-07-22 22:16:23 +02:00
//temporary override for sync-until to give tom the time to review the channels
2019-07-22 22:24:13 +02:00
if q == StatusQueued {
s . syncProperties . SyncUntil = time . Now ( ) . AddDate ( 0 , 0 , - 1 ) . Unix ( )
}
2018-09-26 06:08:18 +02:00
channels , err := s . apiConfig . FetchChannels ( q , s . syncProperties )
2018-07-17 18:54:22 +02:00
if err != nil {
return err
}
2019-06-12 22:42:42 +02:00
log . Infof ( "There are %d channels in the \"%s\" queue" , len ( channels ) , q )
if len ( channels ) > 0 {
c := channels [ 0 ]
2018-07-17 18:54:22 +02:00
syncs = append ( syncs , Sync {
2018-09-26 06:08:18 +02:00
APIConfig : s . apiConfig ,
2018-07-17 18:54:22 +02:00
YoutubeChannelID : c . ChannelId ,
LbryChannelName : c . DesiredChannelName ,
2018-12-25 01:23:40 +01:00
lbryChannelID : c . ChannelClaimID ,
2018-09-26 06:08:18 +02:00
StopOnError : s . stopOnError ,
MaxTries : s . maxTries ,
ConcurrentVideos : s . concurrentVideos ,
TakeOverExistingChannel : s . takeOverExistingChannel ,
Refill : s . refill ,
2018-08-17 16:05:54 +02:00
Manager : s ,
2018-09-26 06:08:18 +02:00
LbrycrdString : s . lbrycrdString ,
AwsS3ID : s . awsS3ID ,
AwsS3Secret : s . awsS3Secret ,
AwsS3Region : s . awsS3Region ,
AwsS3Bucket : s . awsS3Bucket ,
2018-10-09 21:57:07 +02:00
namer : namer . NewNamer ( ) ,
2019-06-06 02:16:07 +02:00
Fee : c . Fee ,
2018-07-17 18:54:22 +02:00
} )
2019-06-24 10:14:27 +02:00
continue
2018-07-17 18:54:22 +02:00
}
}
}
if len ( syncs ) == 0 {
log . Infoln ( "No channels to sync. Pausing 5 minutes!" )
time . Sleep ( 5 * time . Minute )
}
2019-06-12 22:42:42 +02:00
for _ , sync := range syncs {
2018-08-21 19:17:52 +02:00
shouldNotCount := false
2019-07-15 22:16:02 +02:00
logUtils . SendInfoToSlack ( "Syncing %s (%s) to LBRY! total processed channels since startup: %d" , sync . LbryChannelName , sync . YoutubeChannelID , syncCount + 1 )
2018-07-17 18:54:22 +02:00
err := sync . FullCycle ( )
if err != nil {
fatalErrors := [ ] string {
"default_wallet already exists" ,
"WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR" ,
"NotEnoughFunds" ,
"no space left on device" ,
2018-08-21 19:17:52 +02:00
"failure uploading wallet" ,
2019-01-03 14:08:45 +01:00
"the channel in the wallet is different than the channel in the database" ,
2019-01-03 19:55:27 +01:00
"this channel does not belong to this wallet!" ,
2018-07-17 18:54:22 +02:00
}
2018-07-24 02:01:35 +02:00
if util . SubstringInSlice ( err . Error ( ) , fatalErrors ) {
2018-07-17 18:54:22 +02:00
return errors . Prefix ( "@Nikooo777 this requires manual intervention! Exiting..." , err )
}
2018-08-21 19:17:52 +02:00
shouldNotCount = strings . Contains ( err . Error ( ) , "this youtube channel is being managed by another server" )
2018-08-22 16:56:02 +02:00
if ! shouldNotCount {
2019-07-15 22:16:02 +02:00
logUtils . SendInfoToSlack ( "A non fatal error was reported by the sync process. %s\nContinuing..." , err . Error ( ) )
2018-08-22 16:56:02 +02:00
}
2018-07-17 18:54:22 +02:00
}
2019-07-10 15:46:54 +02:00
err = blobs_reflector . ReflectAndClean ( )
if err != nil {
return errors . Prefix ( "@Nikooo777 something went wrong while reflecting blobs" , err )
}
2019-07-15 22:16:02 +02:00
logUtils . SendInfoToSlack ( "Syncing %s (%s) reached an end. total processed channels since startup: %d" , sync . LbryChannelName , sync . YoutubeChannelID , syncCount + 1 )
2018-08-21 19:17:52 +02:00
if ! shouldNotCount {
syncCount ++
}
2018-09-26 06:08:18 +02:00
if sync . IsInterrupted ( ) || ( s . limit != 0 && syncCount >= s . limit ) {
2018-07-17 18:54:22 +02:00
shouldInterruptLoop = true
break
}
}
2018-09-26 06:08:18 +02:00
if shouldInterruptLoop || s . singleRun {
2018-07-17 18:54:22 +02:00
break
}
}
return nil
}
2019-05-03 05:11:52 +02:00
func ( s * SyncManager ) GetS3AWSConfig ( ) aws . Config {
return aws . Config {
Credentials : credentials . NewStaticCredentials ( s . awsS3ID , s . awsS3Secret , "" ) ,
Region : & s . awsS3Region ,
}
}
2018-08-17 16:05:54 +02:00
func ( s * SyncManager ) checkUsedSpace ( ) error {
2018-09-26 06:08:18 +02:00
usedPctile , err := GetUsedSpace ( s . blobsDir )
2018-07-17 18:54:22 +02:00
if err != nil {
return err
}
2018-09-26 06:08:18 +02:00
if usedPctile >= 0.90 && ! s . skipSpaceCheck {
2018-07-17 18:54:22 +02:00
return errors . Err ( fmt . Sprintf ( "more than 90%% of the space has been used. use --skip-space-check to ignore. Used: %.1f%%" , usedPctile * 100 ) )
}
2018-08-21 20:07:30 +02:00
log . Infof ( "disk usage: %.1f%%" , usedPctile * 100 )
2018-07-17 18:54:22 +02:00
return nil
}
2018-07-17 20:58:47 +02:00
// GetUsedSpace returns a value between 0 and 1, with 0 being completely empty and 1 being full, for the disk that holds the provided path
func GetUsedSpace ( path string ) ( float32 , error ) {
var stat syscall . Statfs_t
err := syscall . Statfs ( path , & stat )
if err != nil {
return 0 , err
}
// Available blocks * size per block = available space in bytes
all := stat . Blocks * uint64 ( stat . Bsize )
free := stat . Bfree * uint64 ( stat . Bsize )
used := all - free
return float32 ( used ) / float32 ( all ) , nil
}