2019-01-11 03:02:26 +01:00
package manager
2018-07-17 18:54:22 +02:00
import (
"fmt"
2018-08-21 19:17:52 +02:00
"strings"
2020-11-03 21:41:39 +01:00
"sync"
2018-07-17 20:58:47 +02:00
"syscall"
2018-08-21 19:17:52 +02:00
"time"
2018-07-17 20:58:47 +02:00
2020-06-11 18:45:56 +02:00
"github.com/lbryio/ytsync/v5/blobs_reflector"
"github.com/lbryio/ytsync/v5/ip_manager"
"github.com/lbryio/ytsync/v5/namer"
"github.com/lbryio/ytsync/v5/sdk"
2020-08-08 01:12:55 +02:00
"github.com/lbryio/ytsync/v5/shared"
2020-06-11 18:45:56 +02:00
logUtils "github.com/lbryio/ytsync/v5/util"
2018-10-08 22:19:17 +02:00
2019-10-10 16:50:33 +02:00
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/util"
2019-06-06 23:25:31 +02:00
2018-07-17 18:54:22 +02:00
log "github.com/sirupsen/logrus"
)
type SyncManager struct {
2020-08-08 01:12:55 +02:00
CliFlags shared . SyncFlags
ApiConfig * sdk . APIConfig
LbrycrdDsn string
AwsConfigs * shared . AwsConfigs
blobsDir string
channelsToSync [ ] Sync
2018-09-18 21:20:34 +02:00
}
2020-08-08 01:12:55 +02:00
func NewSyncManager ( cliFlags shared . SyncFlags , blobsDir , lbrycrdDsn string , awsConfigs * shared . AwsConfigs , apiConfig * sdk . APIConfig ) * SyncManager {
2018-09-18 21:20:34 +02:00
return & SyncManager {
2020-08-08 01:12:55 +02:00
CliFlags : cliFlags ,
blobsDir : blobsDir ,
LbrycrdDsn : lbrycrdDsn ,
AwsConfigs : awsConfigs ,
ApiConfig : apiConfig ,
2018-09-18 21:20:34 +02:00
}
2018-07-17 18:54:22 +02:00
}
2020-08-08 01:12:55 +02:00
func ( s * SyncManager ) enqueueChannel ( channel * shared . YoutubeChannel ) {
s . channelsToSync = append ( s . channelsToSync , Sync {
DbChannelData : channel ,
Manager : s ,
namer : namer . NewNamer ( ) ,
2020-11-03 21:41:39 +01:00
hardVideoFailure : hardVideoFailure {
lock : & sync . Mutex { } ,
} ,
2020-08-08 01:12:55 +02:00
} )
}
2019-08-21 05:38:36 +02:00
2018-08-17 16:05:54 +02:00
func ( s * SyncManager ) Start ( ) error {
2019-08-13 05:20:09 +02:00
if logUtils . ShouldCleanOnStartup ( ) {
2019-08-04 00:34:48 +02:00
err := logUtils . CleanForStartup ( )
if err != nil {
return err
}
}
2021-09-27 16:11:07 +02:00
var (
lastChannelProcessed string
secondLastChannelProcessed string
syncCount int
)
2018-07-17 18:54:22 +02:00
for {
2020-08-08 01:12:55 +02:00
s . channelsToSync = make ( [ ] Sync , 0 , 10 ) // reset sync queue
2018-07-17 18:54:22 +02:00
err := s . checkUsedSpace ( )
if err != nil {
2019-08-11 04:50:43 +02:00
return errors . Err ( err )
2018-07-17 18:54:22 +02:00
}
shouldInterruptLoop := false
2020-08-08 01:12:55 +02:00
if s . CliFlags . IsSingleChannelSync ( ) {
channels , err := s . ApiConfig . FetchChannels ( "" , & s . CliFlags )
2018-07-17 18:54:22 +02:00
if err != nil {
2019-08-11 04:50:43 +02:00
return errors . Err ( err )
2018-07-17 18:54:22 +02:00
}
if len ( channels ) != 1 {
return errors . Err ( "Expected 1 channel, %d returned" , len ( channels ) )
}
2020-08-08 01:12:55 +02:00
s . enqueueChannel ( & channels [ 0 ] )
2018-07-17 18:54:22 +02:00
shouldInterruptLoop = true
} else {
var queuesToSync [ ] string
2020-10-27 19:50:10 +01:00
if s . CliFlags . Status != "" {
2020-11-03 21:41:39 +01:00
queuesToSync = append ( queuesToSync , shared . StatusSyncing , s . CliFlags . Status )
2020-08-08 01:12:55 +02:00
} else if s . CliFlags . SyncUpdate {
queuesToSync = append ( queuesToSync , shared . StatusSyncing , shared . StatusSynced )
2018-07-17 18:54:22 +02:00
} else {
2020-08-08 01:12:55 +02:00
queuesToSync = append ( queuesToSync , shared . StatusSyncing , shared . StatusQueued )
2018-07-17 18:54:22 +02:00
}
2020-10-27 19:50:10 +01:00
if s . CliFlags . SecondaryStatus != "" {
queuesToSync = append ( queuesToSync , s . CliFlags . SecondaryStatus )
}
2019-08-20 11:33:06 +02:00
queues :
2018-07-17 18:54:22 +02:00
for _ , q := range queuesToSync {
2020-08-08 01:12:55 +02:00
channels , err := s . ApiConfig . FetchChannels ( q , & s . CliFlags )
2018-07-17 18:54:22 +02:00
if err != nil {
return err
}
2020-08-08 01:12:55 +02:00
log . Infof ( "Currently processing the \"%s\" queue with %d channels" , q , len ( channels ) )
for _ , c := range channels {
s . enqueueChannel ( & c )
queueAll := q == shared . StatusFailed || q == shared . StatusSyncing
if ! queueAll {
break queues
2019-08-20 11:33:06 +02:00
}
2018-07-17 18:54:22 +02:00
}
2020-08-08 01:12:55 +02:00
log . Infof ( "Drained the \"%s\" queue" , q )
2018-07-17 18:54:22 +02:00
}
}
2021-09-27 16:11:07 +02:00
2020-08-08 01:12:55 +02:00
if len ( s . channelsToSync ) == 0 {
2018-07-17 18:54:22 +02:00
log . Infoln ( "No channels to sync. Pausing 5 minutes!" )
time . Sleep ( 5 * time . Minute )
}
2021-09-27 16:11:07 +02:00
2020-08-08 01:12:55 +02:00
for _ , sync := range s . channelsToSync {
2020-11-03 02:14:01 +01:00
if lastChannelProcessed == sync . DbChannelData . ChannelId && secondLastChannelProcessed == lastChannelProcessed {
2020-08-08 01:12:55 +02:00
util . SendToSlack ( "We just killed a sync for %s to stop looping! (%s)" , sync . DbChannelData . DesiredChannelName , sync . DbChannelData . ChannelId )
2020-11-03 02:14:01 +01:00
stopTheLoops := errors . Err ( "Found channel %s running 3 times, set it to failed, and reprocess later" , sync . DbChannelData . DesiredChannelName )
2020-08-03 07:05:03 +02:00
sync . setChannelTerminationStatus ( & stopTheLoops )
continue
}
2020-11-03 02:14:01 +01:00
secondLastChannelProcessed = lastChannelProcessed
2020-08-08 01:12:55 +02:00
lastChannelProcessed = sync . DbChannelData . ChannelId
2018-08-21 19:17:52 +02:00
shouldNotCount := false
2020-08-08 01:12:55 +02:00
logUtils . SendInfoToSlack ( "Syncing %s (%s) to LBRY! total processed channels since startup: %d" , sync . DbChannelData . DesiredChannelName , sync . DbChannelData . ChannelId , syncCount + 1 )
2018-07-17 18:54:22 +02:00
err := sync . FullCycle ( )
2019-12-26 17:52:44 +01:00
//TODO: THIS IS A TEMPORARY WORK AROUND FOR THE STUPID IP LOCKUP BUG
ipPool , _ := ip_manager . GetIPPool ( sync . grp )
if ipPool != nil {
ipPool . ReleaseAll ( )
}
2018-07-17 18:54:22 +02:00
if err != nil {
2020-06-10 03:32:45 +02:00
if strings . Contains ( err . Error ( ) , "quotaExceeded" ) {
logUtils . SleepUntilQuotaReset ( )
}
2018-07-17 18:54:22 +02:00
fatalErrors := [ ] string {
"default_wallet already exists" ,
"WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR" ,
"NotEnoughFunds" ,
"no space left on device" ,
2018-08-21 19:17:52 +02:00
"failure uploading wallet" ,
2019-01-03 14:08:45 +01:00
"the channel in the wallet is different than the channel in the database" ,
2019-01-03 19:55:27 +01:00
"this channel does not belong to this wallet!" ,
2019-09-10 11:43:20 +02:00
"You already have a stream claim published under the name" ,
2018-07-17 18:54:22 +02:00
}
2020-05-30 02:18:20 +02:00
2018-07-24 02:01:35 +02:00
if util . SubstringInSlice ( err . Error ( ) , fatalErrors ) {
2018-07-17 18:54:22 +02:00
return errors . Prefix ( "@Nikooo777 this requires manual intervention! Exiting..." , err )
}
2018-08-21 19:17:52 +02:00
shouldNotCount = strings . Contains ( err . Error ( ) , "this youtube channel is being managed by another server" )
2018-08-22 16:56:02 +02:00
if ! shouldNotCount {
2020-07-29 03:34:08 +02:00
logUtils . SendInfoToSlack ( "A non fatal error was reported by the sync process.\n%s" , errors . FullTrace ( err ) )
2018-08-22 16:56:02 +02:00
}
2018-07-17 18:54:22 +02:00
}
2021-06-17 17:51:21 +02:00
err = logUtils . CleanupMetadata ( )
if err != nil {
log . Errorf ( "something went wrong while trying to clear out the video metadata directory: %s" , errors . FullTrace ( err ) )
}
2019-07-10 15:46:54 +02:00
err = blobs_reflector . ReflectAndClean ( )
if err != nil {
return errors . Prefix ( "@Nikooo777 something went wrong while reflecting blobs" , err )
}
2020-08-08 01:12:55 +02:00
logUtils . SendInfoToSlack ( "%s (%s) reached an end. Total processed channels since startup: %d" , sync . DbChannelData . DesiredChannelName , sync . DbChannelData . ChannelId , syncCount + 1 )
2018-08-21 19:17:52 +02:00
if ! shouldNotCount {
syncCount ++
}
2020-08-08 01:12:55 +02:00
if sync . IsInterrupted ( ) || ( s . CliFlags . Limit != 0 && syncCount >= s . CliFlags . Limit ) {
2018-07-17 18:54:22 +02:00
shouldInterruptLoop = true
break
}
}
2020-08-08 01:12:55 +02:00
if shouldInterruptLoop || s . CliFlags . SingleRun {
2018-07-17 18:54:22 +02:00
break
}
}
2021-09-27 16:11:07 +02:00
2018-07-17 18:54:22 +02:00
return nil
}
2020-08-08 01:12:55 +02:00
2018-08-17 16:05:54 +02:00
func ( s * SyncManager ) checkUsedSpace ( ) error {
2019-08-04 00:34:48 +02:00
usedPctile , err := GetUsedSpace ( logUtils . GetBlobsDir ( ) )
2018-07-17 18:54:22 +02:00
if err != nil {
2019-08-04 00:34:48 +02:00
return errors . Err ( err )
2018-07-17 18:54:22 +02:00
}
2020-08-08 01:12:55 +02:00
if usedPctile >= 0.90 && ! s . CliFlags . SkipSpaceCheck {
2018-07-17 18:54:22 +02:00
return errors . Err ( fmt . Sprintf ( "more than 90%% of the space has been used. use --skip-space-check to ignore. Used: %.1f%%" , usedPctile * 100 ) )
}
2018-08-21 20:07:30 +02:00
log . Infof ( "disk usage: %.1f%%" , usedPctile * 100 )
2018-07-17 18:54:22 +02:00
return nil
}
2018-07-17 20:58:47 +02:00
// GetUsedSpace returns a value between 0 and 1, with 0 being completely empty and 1 being full, for the disk that holds the provided path
func GetUsedSpace ( path string ) ( float32 , error ) {
var stat syscall . Statfs_t
err := syscall . Statfs ( path , & stat )
if err != nil {
return 0 , err
}
// Available blocks * size per block = available space in bytes
all := stat . Blocks * uint64 ( stat . Bsize )
free := stat . Bfree * uint64 ( stat . Bsize )
used := all - free
return float32 ( used ) / float32 ( all ) , nil
}