Skip to content

Commit

Permalink
Queue cordoning in Armadactl (#187) (#3860)
Browse files Browse the repository at this point in the history
* Plumbing queue cordoning into armadactl

* Making armadactl cordoning commands more intuitive, some formatting changes

* Formatting

* Making queue get more intuitive

* Correcting comments and error messages

* Removing single queue cordoning code in favour of cobra alias. Adding only-cordoned flag when fetching queues

* Removing unused methods

* Removing method prefix from error messages

* Fixing import order

Co-authored-by: Mustafa Ilyas <[email protected]>
  • Loading branch information
MustafaI and mustafai-gr authored Aug 2, 2024
1 parent 58f8ea8 commit 9608591
Show file tree
Hide file tree
Showing 11 changed files with 474 additions and 6 deletions.
3 changes: 2 additions & 1 deletion cmd/armadactl/cmd/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,11 @@ func getCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "get",
Short: "Retrieve information about armada resource",
Long: "Retrieve information about armada resource. Supported: queue, scheduling-report, queue-report, job-report",
Long: "Retrieve information about armada resource. Supported: queue, queues, scheduling-report, queue-report, job-report",
}
cmd.AddCommand(
queueGetCmd(),
queuesGetCmd(),
getSchedulingReportCmd(armadactl.New()),
getQueueSchedulingReportCmd(armadactl.New()),
getJobSchedulingReportCmd(armadactl.New()),
Expand Down
134 changes: 134 additions & 0 deletions cmd/armadactl/cmd/cordon.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package cmd

import (
"fmt"

"github.com/armadaproject/armada/internal/armadactl"
"github.com/armadaproject/armada/internal/common/slices"

"github.com/spf13/cobra"
)

func cordon() *cobra.Command {
a := armadactl.New()
cmd := &cobra.Command{
Use: "cordon",
Short: "Pause scheduling by resource",
Long: "Pause scheduling by resource. Supported: queue, queues",
}
cmd.AddCommand(cordonQueues(a))
return cmd
}

func uncordon() *cobra.Command {
a := armadactl.New()
cmd := &cobra.Command{
Use: "uncordon",
Short: "Resume scheduling by resource",
Long: "Resume scheduling by resource. Supported: queue, queues",
}
cmd.AddCommand(uncordonQueues(a))
return cmd
}

func cordonQueues(a *armadactl.App) *cobra.Command {
cmd := &cobra.Command{
Use: "queues <queue_1> <queue_2> <queue_3> ...",
Aliases: []string{"queue"},
Short: "Pause scheduling for select queues",
Long: "Pause scheduling for select queues. This can be achieved either by queue names or by labels.",
PreRunE: func(cmd *cobra.Command, args []string) error {
return initParams(cmd, a.Params)
},
RunE: func(cmd *cobra.Command, queues []string) error {
errs := slices.Filter(slices.Map(queues, queueNameValidation), func(err error) bool { return err != nil })
if len(errs) > 0 {
return fmt.Errorf("provided queue name invalid: %s", errs[0])
}

matchLabels, err := cmd.Flags().GetStringSlice("match-labels")
if err != nil {
return fmt.Errorf("error reading label selection: %s", err)
}

inverse, err := cmd.Flags().GetBool("inverse")
if err != nil {
return fmt.Errorf("error reading inverse flag: %s", err)
}

dryRun, err := cmd.Flags().GetBool("dry-run")
if err != nil {
return fmt.Errorf("error reading dry-run flag: %s", err)
}

if len(queues) == 0 && len(matchLabels) == 0 {
return fmt.Errorf("either queue names or match-labels must be set to determine queues to cordon")
} else if len(queues) > 0 && len(matchLabels) > 0 {
return fmt.Errorf("you can cordon by either a set of queue names or a set of queue labels, but not both")
}

return a.CordonQueues(&armadactl.QueueQueryArgs{
InQueueNames: queues,
ContainsAllLabels: matchLabels,
InvertResult: inverse,
OnlyCordoned: false,
}, dryRun)
},
}
cmd.Flags().StringSliceP("match-labels", "l", []string{}, "Provide a comma separated list of labels. Queues matching all provided labels will have scheduling paused. Defaults to empty.")
cmd.Flags().Bool("inverse", false, "Select all queues which do not match the provided parameters")
cmd.Flags().Bool("dry-run", false, "Show selection of queues that will be modified in this operation")

return cmd
}

func uncordonQueues(a *armadactl.App) *cobra.Command {
cmd := &cobra.Command{
Use: "queues <queue_1> <queue_2> <queue_3> ...",
Aliases: []string{"queue"},
Short: "Resume scheduling for select queues",
Long: "Resume scheduling for select queues. This can be achieved either by queue names or by labels.",
PreRunE: func(cmd *cobra.Command, args []string) error {
return initParams(cmd, a.Params)
},
RunE: func(cmd *cobra.Command, queues []string) error {
errs := slices.Filter(slices.Map(queues, queueNameValidation), func(err error) bool { return err != nil })
if len(errs) > 0 {
return fmt.Errorf("provided queue name invalid: %s", errs[0])
}

matchLabels, err := cmd.Flags().GetStringSlice("match-labels")
if err != nil {
return fmt.Errorf("error reading label selection: %s", err)
}

inverse, err := cmd.Flags().GetBool("inverse")
if err != nil {
return fmt.Errorf("error reading inverse flag: %s", err)
}

dryRun, err := cmd.Flags().GetBool("dry-run")
if err != nil {
return fmt.Errorf("error reading dry-run flag: %s", err)
}

if len(queues) == 0 && len(matchLabels) == 0 {
return fmt.Errorf("either queue names or match-labels must be set to determine queues to uncordon")
} else if len(queues) > 0 && len(matchLabels) > 0 {
return fmt.Errorf("you can uncordon by either a set of queue names or a set of queue labels, but not both")
}

return a.UncordonQueues(&armadactl.QueueQueryArgs{
InQueueNames: queues,
ContainsAllLabels: matchLabels,
InvertResult: inverse,
OnlyCordoned: false,
}, dryRun)
},
}
cmd.Flags().StringSliceP("match-labels", "l", []string{}, "Provide a comma separated list of labels. Queues matching all provided labels will have scheduling resumed. Defaults to empty.")
cmd.Flags().Bool("inverse", false, "Select all queues which do not match the provided parameters")
cmd.Flags().Bool("dry-run", false, "Show selection of queues that will be modified in this operation")

return cmd
}
3 changes: 3 additions & 0 deletions cmd/armadactl/cmd/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ func initParams(cmd *cobra.Command, params *armadactl.Params) error {
params.QueueAPI.Create = cq.Create(client.ExtractCommandlineArmadaApiConnectionDetails)
params.QueueAPI.Delete = cq.Delete(client.ExtractCommandlineArmadaApiConnectionDetails)
params.QueueAPI.Get = cq.Get(client.ExtractCommandlineArmadaApiConnectionDetails)
params.QueueAPI.GetAll = cq.GetAll(client.ExtractCommandlineArmadaApiConnectionDetails)
params.QueueAPI.Update = cq.Update(client.ExtractCommandlineArmadaApiConnectionDetails)
params.QueueAPI.Cordon = cq.Cordon(client.ExtractCommandlineArmadaApiConnectionDetails)
params.QueueAPI.Uncordon = cq.Uncordon(client.ExtractCommandlineArmadaApiConnectionDetails)

return nil
}
93 changes: 88 additions & 5 deletions cmd/armadactl/cmd/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package cmd
import (
"fmt"

"github.com/armadaproject/armada/internal/common/slices"

"github.com/spf13/cobra"

"github.com/armadaproject/armada/internal/armadactl"
Expand All @@ -21,7 +23,7 @@ func queueCreateCmdWithApp(a *armadactl.App) *cobra.Command {
Short: "Create new queue",
Long: `Every job submitted to armada needs to be associated with queue.
Job priority is evaluated inside queue, queue has its own priority.`,
Job priority is evaluated inside queue, queue has its own priority. Any labels on the queue must have a Kubernetes-like key-value structure, for example: armadaproject.io/submitter=airflow.`,
Args: cobra.ExactArgs(1),
PreRunE: func(cmd *cobra.Command, args []string) error {
return initParams(cmd, a.Params)
Expand All @@ -46,22 +48,36 @@ Job priority is evaluated inside queue, queue has its own priority.`,
return fmt.Errorf("error reading group-owners: %s", err)
}

queue, err := queue.NewQueue(&api.Queue{
cordoned, err := cmd.Flags().GetBool("cordon")
if err != nil {
return fmt.Errorf("error reading cordon: %s", err)
}

labels, err := cmd.Flags().GetStringSlice("labels")
if err != nil {
return fmt.Errorf("error reading queue labels: %s", err)
}

newQueue, err := queue.NewQueue(&api.Queue{
Name: name,
PriorityFactor: priorityFactor,
UserOwners: owners,
GroupOwners: groups,
Cordoned: cordoned,
Labels: labels,
})
if err != nil {
return fmt.Errorf("invalid queue data: %s", err)
}

return a.CreateQueue(queue)
return a.CreateQueue(newQueue)
},
}
cmd.Flags().Float64("priority-factor", 1, "Set queue priority factor - lower number makes queue more important, must be > 0.")
cmd.Flags().StringSlice("owners", []string{}, "Comma separated list of queue owners, defaults to current user.")
cmd.Flags().StringSlice("group-owners", []string{}, "Comma separated list of queue group owners, defaults to empty list.")
cmd.Flags().Bool("cordon", false, "Used to pause scheduling on specified queue. Defaults to false.")
cmd.Flags().StringSliceP("labels", "l", []string{}, "Comma separated list of key-value queue labels, for example: armadaproject.io/submitter=airflow. Defaults to empty list.")
return cmd
}

Expand Down Expand Up @@ -109,6 +125,59 @@ func queueGetCmdWithApp(a *armadactl.App) *cobra.Command {
return cmd
}

func queuesGetCmd() *cobra.Command {
return queuesGetCmdWithApp(armadactl.New())
}

// Takes a caller-supplied app struct; useful for testing.
func queuesGetCmdWithApp(a *armadactl.App) *cobra.Command {
cmd := &cobra.Command{
Use: "queues <queue_1> <queue_2> <queue_3> ...",
Short: "Gets information from multiple queues.",
Long: "Gets information from multiple queues, filtering by either a set of queue names or a set of labels. Defaults to retrieving all queues.",
PreRunE: func(cmd *cobra.Command, args []string) error {
return initParams(cmd, a.Params)
},
RunE: func(cmd *cobra.Command, queues []string) error {
errs := slices.Filter(slices.Map(queues, queueNameValidation), func(err error) bool { return err != nil })
if len(errs) > 0 {
return fmt.Errorf("provided queue name invalid: %s", errs[0])
}

onlyCordoned, err := cmd.Flags().GetBool("only-cordoned")
if err != nil {
return fmt.Errorf("error reading only-cordoned flag: %s", err)
}

inverse, err := cmd.Flags().GetBool("inverse")
if err != nil {
return fmt.Errorf("error reading inverse flag: %s", err)
}

labels, err := cmd.Flags().GetStringSlice("match-labels")
if err != nil {
return fmt.Errorf("error reading queue labels: %s", err)
}

if len(queues) > 0 && len(labels) > 0 {
return fmt.Errorf("you can select either with a set of queue names or a set of queue labels, but not both")
}

return a.GetAllQueues(&armadactl.QueueQueryArgs{
InQueueNames: queues,
ContainsAllLabels: labels,
InvertResult: inverse,
OnlyCordoned: onlyCordoned,
})
},
}
cmd.Flags().StringSliceP("match-labels", "l", []string{}, "Select queues by label.")
cmd.Flags().Bool("inverse", false, "Inverts result to get all queues that don't match the specified criteria. Defaults to false.")
cmd.Flags().Bool("only-cordoned", false, "Only returns queues that are cordoned. Defaults to false.")

return cmd
}

func queueUpdateCmd() *cobra.Command {
return queueUpdateCmdWithApp(armadactl.New())
}
Expand Down Expand Up @@ -141,22 +210,36 @@ func queueUpdateCmdWithApp(a *armadactl.App) *cobra.Command {
return fmt.Errorf("error reading group-owners: %s", err)
}

queue, err := queue.NewQueue(&api.Queue{
cordoned, err := cmd.Flags().GetBool("cordon")
if err != nil {
return fmt.Errorf("error reading cordon: %s", err)
}

labels, err := cmd.Flags().GetStringSlice("labels")
if err != nil {
return fmt.Errorf("error reading queue labels: %s", err)
}

newQueue, err := queue.NewQueue(&api.Queue{
Name: name,
PriorityFactor: priorityFactor,
UserOwners: owners,
GroupOwners: groups,
Cordoned: cordoned,
Labels: labels,
})
if err != nil {
return fmt.Errorf("invalid queue data: %s", err)
}

return a.UpdateQueue(queue)
return a.UpdateQueue(newQueue)
},
}
// TODO this will overwrite existing values with default values if not all flags are provided
cmd.Flags().Float64("priority-factor", 1, "Set queue priority factor - lower number makes queue more important, must be > 0.")
cmd.Flags().StringSlice("owners", []string{}, "Comma separated list of queue owners, defaults to current user.")
cmd.Flags().StringSlice("group-owners", []string{}, "Comma separated list of queue group owners, defaults to empty list.")
cmd.Flags().Bool("cordon", false, "Used to pause scheduling on specified queue. Defaults to false.")
cmd.Flags().StringSliceP("labels", "l", []string{}, "Comma separated list of key-value queue labels, for example: armadaproject.io/submitter=airflow. Defaults to empty list.")
return cmd
}
2 changes: 2 additions & 0 deletions cmd/armadactl/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ func RootCmd() *cobra.Command {
configCmd(armadactl.New()),
preemptCmd(),
docsCmd(),
cordon(),
uncordon(),
)

return cmd
Expand Down
10 changes: 10 additions & 0 deletions cmd/armadactl/cmd/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package cmd

import "fmt"

func queueNameValidation(queueName string) error {
if queueName == "" {
return fmt.Errorf("cannot provide empty queue name")
}
return nil
}
1 change: 1 addition & 0 deletions developer/config/insecure-armada.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ auth:
submit_any_jobs: ["everyone"]
create_queue: ["everyone"]
delete_queue: ["everyone"]
cordon_queue: ["everyone"]
cancel_any_jobs: ["everyone"]
reprioritize_any_jobs: ["everyone"]
watch_all_events: ["everyone"]
Expand Down
Loading

0 comments on commit 9608591

Please sign in to comment.