package manager import ( "fmt" "strings" "sync" "syscall" "time" "github.com/lbryio/ytsync/v5/blobs_reflector" "github.com/lbryio/ytsync/v5/configs" "github.com/lbryio/ytsync/v5/ip_manager" "github.com/lbryio/ytsync/v5/namer" "github.com/lbryio/ytsync/v5/sdk" "github.com/lbryio/ytsync/v5/shared" logUtils "github.com/lbryio/ytsync/v5/util" "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/util" log "github.com/sirupsen/logrus" ) type SyncManager struct { CliFlags shared.SyncFlags ApiConfig *sdk.APIConfig LbrycrdDsn string blobsDir string channelsToSync []Sync } func NewSyncManager(cliFlags shared.SyncFlags, blobsDir string) *SyncManager { return &SyncManager{ CliFlags: cliFlags, blobsDir: blobsDir, LbrycrdDsn: configs.Configuration.LbrycrdString, ApiConfig: sdk.GetAPIsConfigs(), } } func (s *SyncManager) enqueueChannel(channel *shared.YoutubeChannel) { s.channelsToSync = append(s.channelsToSync, Sync{ DbChannelData: channel, Manager: s, namer: namer.NewNamer(), hardVideoFailure: hardVideoFailure{ lock: &sync.Mutex{}, }, }) } func (s *SyncManager) Start() error { if logUtils.ShouldCleanOnStartup() { err := logUtils.CleanForStartup() if err != nil { return err } } var lastChannelProcessed string var secondLastChannelProcessed string syncCount := 0 for { s.channelsToSync = make([]Sync, 0, 10) // reset sync queue err := s.checkUsedSpace() if err != nil { return errors.Err(err) } shouldInterruptLoop := false if s.CliFlags.IsSingleChannelSync() { channels, err := s.ApiConfig.FetchChannels("", &s.CliFlags) if err != nil { return errors.Err(err) } if len(channels) != 1 { return errors.Err("Expected 1 channel, %d returned", len(channels)) } s.enqueueChannel(&channels[0]) shouldInterruptLoop = true } else { var queuesToSync []string if s.CliFlags.Status != "" { queuesToSync = append(queuesToSync, shared.StatusSyncing, s.CliFlags.Status) } else if s.CliFlags.SyncUpdate { queuesToSync = append(queuesToSync, shared.StatusSyncing, shared.StatusSynced) } else { queuesToSync = append(queuesToSync, shared.StatusSyncing, shared.StatusQueued) } if s.CliFlags.SecondaryStatus != "" { queuesToSync = append(queuesToSync, s.CliFlags.SecondaryStatus) } queues: for _, q := range queuesToSync { channels, err := s.ApiConfig.FetchChannels(q, &s.CliFlags) if err != nil { return err } log.Infof("Currently processing the \"%s\" queue with %d channels", q, len(channels)) for _, c := range channels { s.enqueueChannel(&c) queueAll := q == shared.StatusFailed || q == shared.StatusSyncing if !queueAll { break queues } } log.Infof("Drained the \"%s\" queue", q) } } if len(s.channelsToSync) == 0 { log.Infoln("No channels to sync. Pausing 5 minutes!") time.Sleep(5 * time.Minute) } for _, sync := range s.channelsToSync { if lastChannelProcessed == sync.DbChannelData.ChannelId && secondLastChannelProcessed == lastChannelProcessed { util.SendToSlack("We just killed a sync for %s to stop looping! (%s)", sync.DbChannelData.DesiredChannelName, sync.DbChannelData.ChannelId) stopTheLoops := errors.Err("Found channel %s running 3 times, set it to failed, and reprocess later", sync.DbChannelData.DesiredChannelName) sync.setChannelTerminationStatus(&stopTheLoops) continue } secondLastChannelProcessed = lastChannelProcessed lastChannelProcessed = sync.DbChannelData.ChannelId shouldNotCount := false logUtils.SendInfoToSlack("Syncing %s (%s) to LBRY! total processed channels since startup: %d", sync.DbChannelData.DesiredChannelName, sync.DbChannelData.ChannelId, syncCount+1) err := sync.FullCycle() //TODO: THIS IS A TEMPORARY WORK AROUND FOR THE STUPID IP LOCKUP BUG ipPool, _ := ip_manager.GetIPPool(sync.grp) if ipPool != nil { ipPool.ReleaseAll() } if err != nil { if strings.Contains(err.Error(), "quotaExceeded") { logUtils.SleepUntilQuotaReset() } fatalErrors := []string{ "default_wallet already exists", "WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR", "NotEnoughFunds", "no space left on device", "there was a problem uploading the wallet", "the channel in the wallet is different than the channel in the database", "this channel does not belong to this wallet!", "You already have a stream claim published under the name", } if util.SubstringInSlice(err.Error(), fatalErrors) { return errors.Prefix("@Nikooo777 this requires manual intervention! Exiting...", err) } shouldNotCount = strings.Contains(err.Error(), "this youtube channel is being managed by another server") if !shouldNotCount { logUtils.SendInfoToSlack("A non fatal error was reported by the sync process.\n%s", errors.FullTrace(err)) } } err = logUtils.CleanupMetadata() if err != nil { log.Errorf("something went wrong while trying to clear out the video metadata directory: %s", errors.FullTrace(err)) } err = blobs_reflector.ReflectAndClean() if err != nil { return errors.Prefix("@Nikooo777 something went wrong while reflecting blobs", err) } logUtils.SendInfoToSlack("%s (%s) reached an end. Total processed channels since startup: %d", sync.DbChannelData.DesiredChannelName, sync.DbChannelData.ChannelId, syncCount+1) if !shouldNotCount { syncCount++ } if sync.IsInterrupted() || (s.CliFlags.Limit != 0 && syncCount >= s.CliFlags.Limit) { shouldInterruptLoop = true break } } if shouldInterruptLoop || s.CliFlags.SingleRun { break } } return nil } func (s *SyncManager) checkUsedSpace() error { usedPctile, err := GetUsedSpace(logUtils.GetBlobsDir()) if err != nil { return errors.Err(err) } if usedPctile >= 0.90 && !s.CliFlags.SkipSpaceCheck { return errors.Err(fmt.Sprintf("more than 90%% of the space has been used. use --skip-space-check to ignore. Used: %.1f%%", usedPctile*100)) } log.Infof("disk usage: %.1f%%", usedPctile*100) return nil } // GetUsedSpace returns a value between 0 and 1, with 0 being completely empty and 1 being full, for the disk that holds the provided path func GetUsedSpace(path string) (float32, error) { var stat syscall.Statfs_t err := syscall.Statfs(path, &stat) if err != nil { return 0, err } // Available blocks * size per block = available space in bytes all := stat.Blocks * uint64(stat.Bsize) free := stat.Bfree * uint64(stat.Bsize) used := all - free return float32(used) / float32(all), nil }