add support for multiple queues

add support for blockchain.db pruning via new status
This commit is contained in:
Niko Storni 2020-10-27 19:50:10 +01:00
parent c4693c221f
commit f5f12e1560
4 changed files with 15 additions and 6 deletions

View file

@ -53,7 +53,8 @@ func main() {
cmd.Flags().BoolVar(&cliFlags.UpgradeMetadata, "upgrade-metadata", false, "Upgrade videos if they're on the old metadata version") cmd.Flags().BoolVar(&cliFlags.UpgradeMetadata, "upgrade-metadata", false, "Upgrade videos if they're on the old metadata version")
cmd.Flags().BoolVar(&cliFlags.DisableTransfers, "no-transfers", false, "Skips the transferring process of videos, channels and supports") cmd.Flags().BoolVar(&cliFlags.DisableTransfers, "no-transfers", false, "Skips the transferring process of videos, channels and supports")
cmd.Flags().BoolVar(&cliFlags.QuickSync, "quick", false, "Look up only the last 50 videos from youtube") cmd.Flags().BoolVar(&cliFlags.QuickSync, "quick", false, "Look up only the last 50 videos from youtube")
cmd.Flags().StringVar(&cliFlags.SyncStatus, "status", "", "Specify which queue to pull from. Overrides --update") cmd.Flags().StringVar(&cliFlags.Status, "status", "", "Specify which queue to pull from. Overrides --update")
cmd.Flags().StringVar(&cliFlags.SecondaryStatus, "status2", "", "Specify which secondary queue to pull from.")
cmd.Flags().StringVar(&cliFlags.ChannelID, "channelID", "", "If specified, only this channel will be synced.") cmd.Flags().StringVar(&cliFlags.ChannelID, "channelID", "", "If specified, only this channel will be synced.")
cmd.Flags().Int64Var(&cliFlags.SyncFrom, "after", time.Unix(0, 0).Unix(), "Specify from when to pull jobs [Unix time](Default: 0)") cmd.Flags().Int64Var(&cliFlags.SyncFrom, "after", time.Unix(0, 0).Unix(), "Specify from when to pull jobs [Unix time](Default: 0)")
cmd.Flags().Int64Var(&cliFlags.SyncUntil, "before", time.Now().AddDate(1, 0, 0).Unix(), "Specify until when to pull jobs [Unix time](Default: current Unix time)") cmd.Flags().Int64Var(&cliFlags.SyncUntil, "before", time.Now().AddDate(1, 0, 0).Unix(), "Specify until when to pull jobs [Unix time](Default: current Unix time)")
@ -87,7 +88,7 @@ func ytSync(cmd *cobra.Command, args []string) {
util.InitSlack(os.Getenv("SLACK_TOKEN"), os.Getenv("SLACK_CHANNEL"), hostname) util.InitSlack(os.Getenv("SLACK_TOKEN"), os.Getenv("SLACK_CHANNEL"), hostname)
} }
if cliFlags.SyncStatus != "" && !util.InSlice(cliFlags.SyncStatus, shared.SyncStatuses) { if cliFlags.Status != "" && !util.InSlice(cliFlags.Status, shared.SyncStatuses) {
log.Errorf("status must be one of the following: %v\n", shared.SyncStatuses) log.Errorf("status must be one of the following: %v\n", shared.SyncStatuses)
return return
} }

View file

@ -76,13 +76,16 @@ func (s *SyncManager) Start() error {
shouldInterruptLoop = true shouldInterruptLoop = true
} else { } else {
var queuesToSync []string var queuesToSync []string
if s.CliFlags.SyncStatus != "" { if s.CliFlags.Status != "" {
queuesToSync = append(queuesToSync, s.CliFlags.SyncStatus) queuesToSync = append(queuesToSync, s.CliFlags.Status)
} else if s.CliFlags.SyncUpdate { } else if s.CliFlags.SyncUpdate {
queuesToSync = append(queuesToSync, shared.StatusSyncing, shared.StatusSynced) queuesToSync = append(queuesToSync, shared.StatusSyncing, shared.StatusSynced)
} else { } else {
queuesToSync = append(queuesToSync, shared.StatusSyncing, shared.StatusQueued) queuesToSync = append(queuesToSync, shared.StatusSyncing, shared.StatusQueued)
} }
if s.CliFlags.SecondaryStatus != "" {
queuesToSync = append(queuesToSync, s.CliFlags.SecondaryStatus)
}
queues: queues:
for _, q := range queuesToSync { for _, q := range queuesToSync {
channels, err := s.ApiConfig.FetchChannels(q, &s.CliFlags) channels, err := s.ApiConfig.FetchChannels(q, &s.CliFlags)

View file

@ -97,7 +97,9 @@ func (s *Sync) downloadBlockchainDB() error {
return errors.Err(err) return errors.Err(err)
} }
} }
if s.DbChannelData.WipeDB {
return nil
}
downloader, err := s.getS3Downloader() downloader, err := s.getS3Downloader()
if err != nil { if err != nil {
return errors.Err(err) return errors.Err(err)

View file

@ -25,6 +25,7 @@ type YoutubeChannel struct {
LengthLimit int `json:"length_limit"` LengthLimit int `json:"length_limit"`
SizeLimit int `json:"size_limit"` SizeLimit int `json:"size_limit"`
LastUploadedVideo string `json:"last_uploaded_video"` LastUploadedVideo string `json:"last_uploaded_video"`
WipeDB bool `json:"wipe_db"`
} }
var NeverRetryFailures = []string{ var NeverRetryFailures = []string{
@ -53,7 +54,8 @@ type SyncFlags struct {
MaxTries int MaxTries int
Refill int Refill int
Limit int Limit int
SyncStatus string Status string
SecondaryStatus string
ChannelID string ChannelID string
SyncFrom int64 SyncFrom int64
SyncUntil int64 SyncUntil int64
@ -86,6 +88,7 @@ const (
StatusPendingUpgrade = "pendingupgrade" // in sync queue. will be synced soon StatusPendingUpgrade = "pendingupgrade" // in sync queue. will be synced soon
StatusSyncing = "syncing" // syncing now StatusSyncing = "syncing" // syncing now
StatusSynced = "synced" // done StatusSynced = "synced" // done
StatusWipeDb = "pendingdbwipe" // in sync queue. lbryum database will be pruned
StatusFailed = "failed" StatusFailed = "failed"
StatusFinalized = "finalized" // no more changes allowed StatusFinalized = "finalized" // no more changes allowed
StatusAbandoned = "abandoned" // deleted on youtube or banned StatusAbandoned = "abandoned" // deleted on youtube or banned