2019-01-11 03:02:26 +01:00
package manager
2018-07-17 18:54:22 +02:00
import (
"fmt"
2018-08-21 19:17:52 +02:00
"strings"
2018-07-17 20:58:47 +02:00
"syscall"
2018-08-21 19:17:52 +02:00
"time"
2018-07-17 20:58:47 +02:00
2020-06-11 18:45:56 +02:00
"github.com/lbryio/ytsync/v5/blobs_reflector"
"github.com/lbryio/ytsync/v5/ip_manager"
"github.com/lbryio/ytsync/v5/namer"
"github.com/lbryio/ytsync/v5/sdk"
logUtils "github.com/lbryio/ytsync/v5/util"
2018-10-08 22:19:17 +02:00
2019-10-10 16:50:33 +02:00
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/util"
2019-06-06 23:25:31 +02:00
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
2018-07-17 18:54:22 +02:00
log "github.com/sirupsen/logrus"
)
type SyncManager struct {
2019-08-30 21:08:28 +02:00
SyncFlags sdk . SyncFlags
maxTries int
refill int
limit int
concurrentJobs int
concurrentVideos int
blobsDir string
videosLimit int
maxVideoSize int
2020-07-28 01:35:07 +02:00
maxVideoLength time . Duration
2019-08-30 21:08:28 +02:00
lbrycrdString string
awsS3ID string
awsS3Secret string
awsS3Region string
syncStatus string
awsS3Bucket string
syncProperties * sdk . SyncProperties
apiConfig * sdk . APIConfig
2018-09-18 21:20:34 +02:00
}
2019-08-30 21:08:28 +02:00
func NewSyncManager ( syncFlags sdk . SyncFlags , maxTries int , refill int , limit int , concurrentJobs int , concurrentVideos int , blobsDir string , videosLimit int ,
2018-09-26 06:08:18 +02:00
maxVideoSize int , lbrycrdString string , awsS3ID string , awsS3Secret string , awsS3Region string , awsS3Bucket string ,
2020-07-28 01:35:07 +02:00
syncStatus string , syncProperties * sdk . SyncProperties , apiConfig * sdk . APIConfig , maxVideoLength time . Duration ) * SyncManager {
2018-09-18 21:20:34 +02:00
return & SyncManager {
2019-08-30 21:08:28 +02:00
SyncFlags : syncFlags ,
maxTries : maxTries ,
refill : refill ,
limit : limit ,
concurrentJobs : concurrentJobs ,
concurrentVideos : concurrentVideos ,
blobsDir : blobsDir ,
videosLimit : videosLimit ,
maxVideoSize : maxVideoSize ,
maxVideoLength : maxVideoLength ,
lbrycrdString : lbrycrdString ,
awsS3ID : awsS3ID ,
awsS3Secret : awsS3Secret ,
awsS3Region : awsS3Region ,
awsS3Bucket : awsS3Bucket ,
syncStatus : syncStatus ,
syncProperties : syncProperties ,
apiConfig : apiConfig ,
2018-09-18 21:20:34 +02:00
}
2018-07-17 18:54:22 +02:00
}
const (
2019-05-03 05:11:52 +02:00
StatusPending = "pending" // waiting for permission to sync
StatusPendingEmail = "pendingemail" // permission granted but missing email
StatusQueued = "queued" // in sync queue. will be synced soon
StatusPendingUpgrade = "pendingupgrade" // in sync queue. will be synced soon
StatusSyncing = "syncing" // syncing now
StatusSynced = "synced" // done
StatusFailed = "failed"
StatusFinalized = "finalized" // no more changes allowed
StatusAbandoned = "abandoned" // deleted on youtube or banned
2018-07-17 18:54:22 +02:00
)
2019-08-21 19:40:35 +02:00
const LatestMetadataVersion = 2
2018-07-17 18:54:22 +02:00
2019-05-03 05:11:52 +02:00
var SyncStatuses = [ ] string { StatusPending , StatusPendingEmail , StatusPendingUpgrade , StatusQueued , StatusSyncing , StatusSynced , StatusFailed , StatusFinalized , StatusAbandoned }
2018-07-17 18:54:22 +02:00
2018-07-21 01:56:36 +02:00
const (
2019-06-10 21:59:42 +02:00
VideoStatusPublished = "published"
VideoStatusFailed = "failed"
VideoStatusUpgradeFailed = "upgradefailed"
VideoStatusUnpublished = "unpublished"
2019-08-20 07:06:51 +02:00
VideoStatusTranferFailed = "transferfailed"
2018-07-21 01:56:36 +02:00
)
2019-08-21 05:38:36 +02:00
const (
TransferStateNotTouched = iota
TransferStatePending
TransferStateComplete
2019-12-24 05:00:16 +01:00
TransferStateManual
2019-08-21 05:38:36 +02:00
)
2018-08-17 16:05:54 +02:00
func ( s * SyncManager ) Start ( ) error {
2018-09-18 21:20:34 +02:00
2019-08-13 05:20:09 +02:00
if logUtils . ShouldCleanOnStartup ( ) {
2019-08-04 00:34:48 +02:00
err := logUtils . CleanForStartup ( )
if err != nil {
return err
}
}
2018-07-17 18:54:22 +02:00
syncCount := 0
for {
err := s . checkUsedSpace ( )
if err != nil {
2019-08-11 04:50:43 +02:00
return errors . Err ( err )
2018-07-17 18:54:22 +02:00
}
var syncs [ ] Sync
shouldInterruptLoop := false
2018-09-26 06:08:18 +02:00
isSingleChannelSync := s . syncProperties . YoutubeChannelID != ""
2018-07-17 18:54:22 +02:00
if isSingleChannelSync {
2018-09-26 06:08:18 +02:00
channels , err := s . apiConfig . FetchChannels ( "" , s . syncProperties )
2018-07-17 18:54:22 +02:00
if err != nil {
2019-08-11 04:50:43 +02:00
return errors . Err ( err )
2018-07-17 18:54:22 +02:00
}
if len ( channels ) != 1 {
return errors . Err ( "Expected 1 channel, %d returned" , len ( channels ) )
}
lbryChannelName := channels [ 0 ] . DesiredChannelName
syncs = make ( [ ] Sync , 1 )
syncs [ 0 ] = Sync {
2019-10-08 01:59:18 +02:00
APIConfig : s . apiConfig ,
YoutubeChannelID : s . syncProperties . YoutubeChannelID ,
LbryChannelName : lbryChannelName ,
lbryChannelID : channels [ 0 ] . ChannelClaimID ,
MaxTries : s . maxTries ,
ConcurrentVideos : s . concurrentVideos ,
Refill : s . refill ,
Manager : s ,
2020-04-10 17:46:07 +02:00
MaxVideoLength : s . maxVideoLength ,
2019-10-08 01:59:18 +02:00
LbrycrdString : s . lbrycrdString ,
AwsS3ID : s . awsS3ID ,
AwsS3Secret : s . awsS3Secret ,
AwsS3Region : s . awsS3Region ,
AwsS3Bucket : s . awsS3Bucket ,
namer : namer . NewNamer ( ) ,
Fee : channels [ 0 ] . Fee ,
clientPublishAddress : channels [ 0 ] . PublishAddress ,
publicKey : channels [ 0 ] . PublicKey ,
transferState : channels [ 0 ] . TransferState ,
2018-07-17 18:54:22 +02:00
}
shouldInterruptLoop = true
} else {
var queuesToSync [ ] string
2018-09-26 06:08:18 +02:00
if s . syncStatus != "" {
queuesToSync = append ( queuesToSync , s . syncStatus )
2019-08-30 21:08:28 +02:00
} else if s . SyncFlags . SyncUpdate {
2018-07-17 18:54:22 +02:00
queuesToSync = append ( queuesToSync , StatusSyncing , StatusSynced )
} else {
queuesToSync = append ( queuesToSync , StatusSyncing , StatusQueued )
}
2019-08-20 11:33:06 +02:00
queues :
2018-07-17 18:54:22 +02:00
for _ , q := range queuesToSync {
2019-07-22 22:16:23 +02:00
//temporary override for sync-until to give tom the time to review the channels
2019-07-22 22:24:13 +02:00
if q == StatusQueued {
2019-12-27 01:27:29 +01:00
s . syncProperties . SyncUntil = time . Now ( ) . Add ( - 8 * time . Hour ) . Unix ( )
2019-07-22 22:24:13 +02:00
}
2018-09-26 06:08:18 +02:00
channels , err := s . apiConfig . FetchChannels ( q , s . syncProperties )
2018-07-17 18:54:22 +02:00
if err != nil {
return err
}
2019-08-20 11:33:06 +02:00
for i , c := range channels {
log . Infof ( "There are %d channels in the \"%s\" queue" , len ( channels ) - i , q )
2020-04-08 23:14:10 +02:00
maxVideoLength := s . maxVideoLength
2020-03-12 04:08:50 +01:00
if c . TotalSubscribers < 1000 {
2020-07-28 01:35:07 +02:00
maxVideoLength = 1 * time . Hour
2020-03-12 04:08:50 +01:00
}
2018-07-17 18:54:22 +02:00
syncs = append ( syncs , Sync {
2019-10-08 01:59:18 +02:00
APIConfig : s . apiConfig ,
YoutubeChannelID : c . ChannelId ,
LbryChannelName : c . DesiredChannelName ,
lbryChannelID : c . ChannelClaimID ,
MaxTries : s . maxTries ,
ConcurrentVideos : s . concurrentVideos ,
2020-04-08 23:14:10 +02:00
MaxVideoLength : maxVideoLength ,
2019-10-08 01:59:18 +02:00
Refill : s . refill ,
Manager : s ,
LbrycrdString : s . lbrycrdString ,
AwsS3ID : s . awsS3ID ,
AwsS3Secret : s . awsS3Secret ,
AwsS3Region : s . awsS3Region ,
AwsS3Bucket : s . awsS3Bucket ,
namer : namer . NewNamer ( ) ,
Fee : c . Fee ,
clientPublishAddress : c . PublishAddress ,
publicKey : c . PublicKey ,
transferState : c . TransferState ,
2018-07-17 18:54:22 +02:00
} )
2019-08-20 11:33:06 +02:00
if q != StatusFailed {
continue queues
}
2018-07-17 18:54:22 +02:00
}
}
}
if len ( syncs ) == 0 {
log . Infoln ( "No channels to sync. Pausing 5 minutes!" )
time . Sleep ( 5 * time . Minute )
}
2019-06-12 22:42:42 +02:00
for _ , sync := range syncs {
2018-08-21 19:17:52 +02:00
shouldNotCount := false
2019-07-15 22:16:02 +02:00
logUtils . SendInfoToSlack ( "Syncing %s (%s) to LBRY! total processed channels since startup: %d" , sync . LbryChannelName , sync . YoutubeChannelID , syncCount + 1 )
2018-07-17 18:54:22 +02:00
err := sync . FullCycle ( )
2019-12-26 17:52:44 +01:00
//TODO: THIS IS A TEMPORARY WORK AROUND FOR THE STUPID IP LOCKUP BUG
ipPool , _ := ip_manager . GetIPPool ( sync . grp )
if ipPool != nil {
ipPool . ReleaseAll ( )
}
2018-07-17 18:54:22 +02:00
if err != nil {
2020-06-10 03:32:45 +02:00
if strings . Contains ( err . Error ( ) , "quotaExceeded" ) {
logUtils . SleepUntilQuotaReset ( )
}
2018-07-17 18:54:22 +02:00
fatalErrors := [ ] string {
"default_wallet already exists" ,
"WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR" ,
"NotEnoughFunds" ,
"no space left on device" ,
2018-08-21 19:17:52 +02:00
"failure uploading wallet" ,
2019-01-03 14:08:45 +01:00
"the channel in the wallet is different than the channel in the database" ,
2019-01-03 19:55:27 +01:00
"this channel does not belong to this wallet!" ,
2019-09-10 11:43:20 +02:00
"You already have a stream claim published under the name" ,
2018-07-17 18:54:22 +02:00
}
2020-05-30 02:18:20 +02:00
2018-07-24 02:01:35 +02:00
if util . SubstringInSlice ( err . Error ( ) , fatalErrors ) {
2018-07-17 18:54:22 +02:00
return errors . Prefix ( "@Nikooo777 this requires manual intervention! Exiting..." , err )
}
2018-08-21 19:17:52 +02:00
shouldNotCount = strings . Contains ( err . Error ( ) , "this youtube channel is being managed by another server" )
2018-08-22 16:56:02 +02:00
if ! shouldNotCount {
2020-07-29 03:34:08 +02:00
logUtils . SendInfoToSlack ( "A non fatal error was reported by the sync process.\n%s" , errors . FullTrace ( err ) )
2018-08-22 16:56:02 +02:00
}
2018-07-17 18:54:22 +02:00
}
2019-07-10 15:46:54 +02:00
err = blobs_reflector . ReflectAndClean ( )
if err != nil {
return errors . Prefix ( "@Nikooo777 something went wrong while reflecting blobs" , err )
}
2019-07-15 22:16:02 +02:00
logUtils . SendInfoToSlack ( "Syncing %s (%s) reached an end. total processed channels since startup: %d" , sync . LbryChannelName , sync . YoutubeChannelID , syncCount + 1 )
2018-08-21 19:17:52 +02:00
if ! shouldNotCount {
syncCount ++
}
2018-09-26 06:08:18 +02:00
if sync . IsInterrupted ( ) || ( s . limit != 0 && syncCount >= s . limit ) {
2018-07-17 18:54:22 +02:00
shouldInterruptLoop = true
break
}
}
2019-08-30 21:08:28 +02:00
if shouldInterruptLoop || s . SyncFlags . SingleRun {
2018-07-17 18:54:22 +02:00
break
}
}
return nil
}
2019-05-03 05:11:52 +02:00
func ( s * SyncManager ) GetS3AWSConfig ( ) aws . Config {
return aws . Config {
Credentials : credentials . NewStaticCredentials ( s . awsS3ID , s . awsS3Secret , "" ) ,
Region : & s . awsS3Region ,
}
}
2018-08-17 16:05:54 +02:00
func ( s * SyncManager ) checkUsedSpace ( ) error {
2019-08-04 00:34:48 +02:00
usedPctile , err := GetUsedSpace ( logUtils . GetBlobsDir ( ) )
2018-07-17 18:54:22 +02:00
if err != nil {
2019-08-04 00:34:48 +02:00
return errors . Err ( err )
2018-07-17 18:54:22 +02:00
}
2019-08-30 21:08:28 +02:00
if usedPctile >= 0.90 && ! s . SyncFlags . SkipSpaceCheck {
2018-07-17 18:54:22 +02:00
return errors . Err ( fmt . Sprintf ( "more than 90%% of the space has been used. use --skip-space-check to ignore. Used: %.1f%%" , usedPctile * 100 ) )
}
2018-08-21 20:07:30 +02:00
log . Infof ( "disk usage: %.1f%%" , usedPctile * 100 )
2018-07-17 18:54:22 +02:00
return nil
}
2018-07-17 20:58:47 +02:00
// GetUsedSpace returns a value between 0 and 1, with 0 being completely empty and 1 being full, for the disk that holds the provided path
func GetUsedSpace ( path string ) ( float32 , error ) {
var stat syscall . Statfs_t
err := syscall . Statfs ( path , & stat )
if err != nil {
return 0 , err
}
// Available blocks * size per block = available space in bytes
all := stat . Blocks * uint64 ( stat . Bsize )
free := stat . Bfree * uint64 ( stat . Bsize )
used := all - free
return float32 ( used ) / float32 ( all ) , nil
}