Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preempt/Cancel on executor #4022

Merged
merged 15 commits into from
Nov 5, 2024
43 changes: 43 additions & 0 deletions cmd/armadactl/cmd/cancel.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package cmd

import (
"fmt"

"github.com/spf13/cobra"

"github.com/armadaproject/armada/internal/armadactl"
Expand All @@ -16,6 +18,7 @@ func cancelCmd() *cobra.Command {
cmd.AddCommand(
cancelJobCmd(),
cancelJobSetCmd(),
cancelExecutorCmd(),
)
return cmd
}
Expand Down Expand Up @@ -58,3 +61,43 @@ func cancelJobSetCmd() *cobra.Command {
}
return cmd
}

func cancelExecutorCmd() *cobra.Command {
a := armadactl.New()
cmd := &cobra.Command{
Use: "executor <executor>",
Short: "Cancels jobs on executor.",
Long: `Cancels jobs on executor with provided executor name, priority classes and queues.`,
Args: cobra.ExactArgs(1),
PreRunE: func(cmd *cobra.Command, args []string) error {
if err := cmd.MarkFlagRequired("priority-classes"); err != nil {
return fmt.Errorf("error marking priority-class flag as required: %s", err)
}
return initParams(cmd, a.Params)
},
RunE: func(cmd *cobra.Command, args []string) error {
onExecutor := args[0]

priorityClasses, err := cmd.Flags().GetStringSlice("priority-classes")
if err != nil {
return fmt.Errorf("error reading priority-class selection: %s", err)
}

queues, err := cmd.Flags().GetStringSlice("queues")
if err != nil {
return fmt.Errorf("error reading queue selection: %s", err)
}

return a.CancelOnExecutor(onExecutor, queues, priorityClasses)
},
}

cmd.Flags().StringSliceP(
"queues",
"q",
[]string{},
"Cancel jobs on executor matching the specified queue names. If no queues are provided, jobs across all queues will be cancelled. Provided queues should be comma separated, as in the following example: queueA,queueB,queueC.",
)
cmd.Flags().StringSliceP("priority-classes", "p", []string{}, "Cancel jobs on executor matching the specified priority classes. Provided priority classes should be comma separated, as in the following example: armada-default,armada-preemptible.")
return cmd
Copy link
Collaborator

@robertdavidsmith robertdavidsmith Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest

  • rename --match-queues to just --queues
  • for consistency, if --queues requires a command line argument, then priority classes should require a --priority-classes arg.
  • comment should specify what the delimiter is if you want to cancel multiple queues

}
3 changes: 3 additions & 0 deletions cmd/armadactl/cmd/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,8 @@ func initParams(cmd *cobra.Command, params *armadactl.Params) error {
params.ExecutorAPI.Cordon = ce.CordonExecutor(client.ExtractCommandlineArmadaApiConnectionDetails)
params.ExecutorAPI.Uncordon = ce.UncordonExecutor(client.ExtractCommandlineArmadaApiConnectionDetails)

params.ExecutorAPI.CancelOnExecutor = ce.CancelOnExecutor(client.ExtractCommandlineArmadaApiConnectionDetails)
params.ExecutorAPI.PreemptOnExecutor = ce.PreemptOnExecutor(client.ExtractCommandlineArmadaApiConnectionDetails)

return nil
}
46 changes: 45 additions & 1 deletion cmd/armadactl/cmd/preempt.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package cmd

import (
"fmt"

"github.com/spf13/cobra"

"github.com/armadaproject/armada/internal/armadactl"
Expand All @@ -12,7 +14,10 @@ func preemptCmd() *cobra.Command {
Short: "Preempt jobs in armada.",
Args: cobra.ExactArgs(0),
}
cmd.AddCommand(preemptJobCmd())
cmd.AddCommand(
preemptJobCmd(),
preemptExecutorCmd(),
)
return cmd
}

Expand All @@ -35,3 +40,42 @@ func preemptJobCmd() *cobra.Command {
}
return cmd
}

func preemptExecutorCmd() *cobra.Command {
a := armadactl.New()
cmd := &cobra.Command{
Use: "executor <executor>",
Short: "Preempts jobs on executor.",
Long: `Preempts jobs on executor with provided executor name, priority classes and queues.`,
Args: cobra.ExactArgs(1),
PreRunE: func(cmd *cobra.Command, args []string) error {
if err := cmd.MarkFlagRequired("priority-classes"); err != nil {
return fmt.Errorf("error marking priority-class flag as required: %s", err)
}
return initParams(cmd, a.Params)
},
RunE: func(cmd *cobra.Command, args []string) error {
onExecutor := args[0]

priorityClasses, err := cmd.Flags().GetStringSlice("priority-classes")
if err != nil {
return fmt.Errorf("error reading priority-class selection: %s", err)
}

queues, err := cmd.Flags().GetStringSlice("queues")
if err != nil {
return fmt.Errorf("error reading queue selection: %s", err)
}

return a.PreemptOnExecutor(onExecutor, queues, priorityClasses)
},
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to last comment

cmd.Flags().StringSliceP(
"queues",
"q",
[]string{},
"Preempt jobs on executor matching the specified queue names. If no queues are provided, jobs across all queues will be preempted. Provided queues should be comma separated, as in the following example: queueA,queueB,queueC.",
)
cmd.Flags().StringSliceP("priority-classes", "p", []string{}, "Preempt jobs on executor matching the specified priority classes. Provided priority classes should be comma separated, as in the following example: armada-default,armada-preemptible.")
return cmd
}
1 change: 1 addition & 0 deletions developer/config/insecure-armada.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ auth:
cordon_queue: ["everyone"]
cancel_any_jobs: ["everyone"]
reprioritize_any_jobs: ["everyone"]
preempt_any_jobs: ["everyone"]
watch_all_events: ["everyone"]
execute_jobs: ["everyone"]
update_executor_settings: ["everyone"]
6 changes: 4 additions & 2 deletions internal/armadactl/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ type QueueAPI struct {
}

type ExecutorAPI struct {
Cordon executor.CordonAPI
Uncordon executor.UncordonAPI
Cordon executor.CordonAPI
Uncordon executor.UncordonAPI
CancelOnExecutor executor.CancelAPI
PreemptOnExecutor executor.PreemptAPI
}

// New instantiates an App with default parameters, including standard output
Expand Down
20 changes: 20 additions & 0 deletions internal/armadactl/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/pkg/errors"

"github.com/armadaproject/armada/internal/common"
armadaslices "github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/pkg/api"
"github.com/armadaproject/armada/pkg/client"
)
Expand Down Expand Up @@ -53,3 +54,22 @@ func (a *App) CancelJobSet(queue string, jobSetId string) (outerErr error) {
return nil
})
}

func (a *App) CancelOnExecutor(executor string, queues []string, priorityClasses []string) error {
queueMsg := strings.Join(queues, ",")
priorityClassesMsg := strings.Join(priorityClasses, ",")
// If the provided slice of queues is empty, jobs on all queues will be cancelled
if len(queues) == 0 {
apiQueues, err := a.getAllQueuesAsAPIQueue(&QueueQueryArgs{})
if err != nil {
return fmt.Errorf("error cancelling jobs on executor %s: %s", executor, err)
}
queues = armadaslices.Map(apiQueues, func(q *api.Queue) string { return q.Name })
queueMsg = "all"
}
fmt.Fprintf(a.Out, "Requesting cancellation of jobs matching executor: %s, queues: %s, priority-classes: %s\n", executor, queueMsg, priorityClassesMsg)
if err := a.Params.ExecutorAPI.CancelOnExecutor(executor, queues, priorityClasses); err != nil {
return fmt.Errorf("error cancelling jobs on executor %s: %s", executor, err)
}
return nil
}
22 changes: 22 additions & 0 deletions internal/armadactl/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package armadactl

import (
"fmt"
"strings"

"github.com/pkg/errors"

"github.com/armadaproject/armada/internal/common"
armadaslices "github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/pkg/api"
"github.com/armadaproject/armada/pkg/client"
)
Expand All @@ -32,3 +34,23 @@ func (a *App) Preempt(queue string, jobSetId string, jobId string) (outerErr err
return nil
})
}

func (a *App) PreemptOnExecutor(executor string, queues []string, priorityClasses []string) error {
queueMsg := strings.Join(queues, ",")
priorityClassesMsg := strings.Join(priorityClasses, ",")
// If the provided slice of queues is empty, jobs on all queues will be cancelled
if len(queues) == 0 {
apiQueues, err := a.getAllQueuesAsAPIQueue(&QueueQueryArgs{})
if err != nil {
return fmt.Errorf("error preempting jobs on executor %s: %s", executor, err)
}
queues = armadaslices.Map(apiQueues, func(q *api.Queue) string { return q.Name })
queueMsg = "all"
}

fmt.Fprintf(a.Out, "Requesting preemption of jobs matching executor: %s, queues: %s, priority-classes: %s\n", executor, queueMsg, priorityClassesMsg)
if err := a.Params.ExecutorAPI.PreemptOnExecutor(executor, queues, priorityClasses); err != nil {
return fmt.Errorf("error preempting jobs on executor %s: %s", executor, err)
}
return nil
}
20 changes: 20 additions & 0 deletions internal/common/ingest/testfixtures/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,26 @@ var DeleteExecutorSettings = &controlplaneevents.Event{
},
}

var PreemptOnExecutor = &controlplaneevents.Event{
Event: &controlplaneevents.Event_PreemptOnExecutor{
PreemptOnExecutor: &controlplaneevents.PreemptOnExecutor{
Name: ExecutorId,
Queues: []string{Queue},
PriorityClasses: []string{PriorityClassName},
},
},
}

var CancelOnExecutor = &controlplaneevents.Event{
Event: &controlplaneevents.Event_CancelOnExecutor{
CancelOnExecutor: &controlplaneevents.CancelOnExecutor{
Name: ExecutorId,
Queues: []string{Queue},
PriorityClasses: []string{PriorityClassName},
},
},
}

func JobSetCancelRequestedWithStateFilter(states ...armadaevents.JobState) *armadaevents.EventSequence_Event {
return &armadaevents.EventSequence_Event{
Created: testfixtures.BasetimeProto,
Expand Down
Loading