diff --git a/cmd/armadactl/cmd/cancel.go b/cmd/armadactl/cmd/cancel.go index 80bff893c96..15e264db588 100644 --- a/cmd/armadactl/cmd/cancel.go +++ b/cmd/armadactl/cmd/cancel.go @@ -1,6 +1,8 @@ package cmd import ( + "fmt" + "github.com/spf13/cobra" "github.com/armadaproject/armada/internal/armadactl" @@ -16,6 +18,7 @@ func cancelCmd() *cobra.Command { cmd.AddCommand( cancelJobCmd(), cancelJobSetCmd(), + cancelExecutorCmd(), ) return cmd } @@ -58,3 +61,43 @@ func cancelJobSetCmd() *cobra.Command { } return cmd } + +func cancelExecutorCmd() *cobra.Command { + a := armadactl.New() + cmd := &cobra.Command{ + Use: "executor ", + Short: "Cancels jobs on executor.", + Long: `Cancels jobs on executor with provided executor name, priority classes and queues.`, + Args: cobra.ExactArgs(1), + PreRunE: func(cmd *cobra.Command, args []string) error { + if err := cmd.MarkFlagRequired("priority-classes"); err != nil { + return fmt.Errorf("error marking priority-class flag as required: %s", err) + } + return initParams(cmd, a.Params) + }, + RunE: func(cmd *cobra.Command, args []string) error { + onExecutor := args[0] + + priorityClasses, err := cmd.Flags().GetStringSlice("priority-classes") + if err != nil { + return fmt.Errorf("error reading priority-class selection: %s", err) + } + + queues, err := cmd.Flags().GetStringSlice("queues") + if err != nil { + return fmt.Errorf("error reading queue selection: %s", err) + } + + return a.CancelOnExecutor(onExecutor, queues, priorityClasses) + }, + } + + cmd.Flags().StringSliceP( + "queues", + "q", + []string{}, + "Cancel jobs on executor matching the specified queue names. If no queues are provided, jobs across all queues will be cancelled. Provided queues should be comma separated, as in the following example: queueA,queueB,queueC.", + ) + cmd.Flags().StringSliceP("priority-classes", "p", []string{}, "Cancel jobs on executor matching the specified priority classes. Provided priority classes should be comma separated, as in the following example: armada-default,armada-preemptible.") + return cmd +} diff --git a/cmd/armadactl/cmd/params.go b/cmd/armadactl/cmd/params.go index 57c2263f3a2..c1c6e77dd2d 100644 --- a/cmd/armadactl/cmd/params.go +++ b/cmd/armadactl/cmd/params.go @@ -33,5 +33,8 @@ func initParams(cmd *cobra.Command, params *armadactl.Params) error { params.ExecutorAPI.Cordon = ce.CordonExecutor(client.ExtractCommandlineArmadaApiConnectionDetails) params.ExecutorAPI.Uncordon = ce.UncordonExecutor(client.ExtractCommandlineArmadaApiConnectionDetails) + params.ExecutorAPI.CancelOnExecutor = ce.CancelOnExecutor(client.ExtractCommandlineArmadaApiConnectionDetails) + params.ExecutorAPI.PreemptOnExecutor = ce.PreemptOnExecutor(client.ExtractCommandlineArmadaApiConnectionDetails) + return nil } diff --git a/cmd/armadactl/cmd/preempt.go b/cmd/armadactl/cmd/preempt.go index 4458e905b0e..8380f691d75 100644 --- a/cmd/armadactl/cmd/preempt.go +++ b/cmd/armadactl/cmd/preempt.go @@ -1,6 +1,8 @@ package cmd import ( + "fmt" + "github.com/spf13/cobra" "github.com/armadaproject/armada/internal/armadactl" @@ -12,7 +14,10 @@ func preemptCmd() *cobra.Command { Short: "Preempt jobs in armada.", Args: cobra.ExactArgs(0), } - cmd.AddCommand(preemptJobCmd()) + cmd.AddCommand( + preemptJobCmd(), + preemptExecutorCmd(), + ) return cmd } @@ -35,3 +40,42 @@ func preemptJobCmd() *cobra.Command { } return cmd } + +func preemptExecutorCmd() *cobra.Command { + a := armadactl.New() + cmd := &cobra.Command{ + Use: "executor ", + Short: "Preempts jobs on executor.", + Long: `Preempts jobs on executor with provided executor name, priority classes and queues.`, + Args: cobra.ExactArgs(1), + PreRunE: func(cmd *cobra.Command, args []string) error { + if err := cmd.MarkFlagRequired("priority-classes"); err != nil { + return fmt.Errorf("error marking priority-class flag as required: %s", err) + } + return initParams(cmd, a.Params) + }, + RunE: func(cmd *cobra.Command, args []string) error { + onExecutor := args[0] + + priorityClasses, err := cmd.Flags().GetStringSlice("priority-classes") + if err != nil { + return fmt.Errorf("error reading priority-class selection: %s", err) + } + + queues, err := cmd.Flags().GetStringSlice("queues") + if err != nil { + return fmt.Errorf("error reading queue selection: %s", err) + } + + return a.PreemptOnExecutor(onExecutor, queues, priorityClasses) + }, + } + cmd.Flags().StringSliceP( + "queues", + "q", + []string{}, + "Preempt jobs on executor matching the specified queue names. If no queues are provided, jobs across all queues will be preempted. Provided queues should be comma separated, as in the following example: queueA,queueB,queueC.", + ) + cmd.Flags().StringSliceP("priority-classes", "p", []string{}, "Preempt jobs on executor matching the specified priority classes. Provided priority classes should be comma separated, as in the following example: armada-default,armada-preemptible.") + return cmd +} diff --git a/developer/config/insecure-armada.yaml b/developer/config/insecure-armada.yaml index a006ec84c24..586176ad988 100644 --- a/developer/config/insecure-armada.yaml +++ b/developer/config/insecure-armada.yaml @@ -9,6 +9,7 @@ auth: cordon_queue: ["everyone"] cancel_any_jobs: ["everyone"] reprioritize_any_jobs: ["everyone"] + preempt_any_jobs: ["everyone"] watch_all_events: ["everyone"] execute_jobs: ["everyone"] update_executor_settings: ["everyone"] diff --git a/internal/armadactl/app.go b/internal/armadactl/app.go index 8bc1f5eccc3..ef87562ac9e 100644 --- a/internal/armadactl/app.go +++ b/internal/armadactl/app.go @@ -59,8 +59,10 @@ type QueueAPI struct { } type ExecutorAPI struct { - Cordon executor.CordonAPI - Uncordon executor.UncordonAPI + Cordon executor.CordonAPI + Uncordon executor.UncordonAPI + CancelOnExecutor executor.CancelAPI + PreemptOnExecutor executor.PreemptAPI } // New instantiates an App with default parameters, including standard output diff --git a/internal/armadactl/cancel.go b/internal/armadactl/cancel.go index 39cf37b19ae..4196b6e8339 100644 --- a/internal/armadactl/cancel.go +++ b/internal/armadactl/cancel.go @@ -7,6 +7,7 @@ import ( "github.com/pkg/errors" "github.com/armadaproject/armada/internal/common" + armadaslices "github.com/armadaproject/armada/internal/common/slices" "github.com/armadaproject/armada/pkg/api" "github.com/armadaproject/armada/pkg/client" ) @@ -53,3 +54,22 @@ func (a *App) CancelJobSet(queue string, jobSetId string) (outerErr error) { return nil }) } + +func (a *App) CancelOnExecutor(executor string, queues []string, priorityClasses []string) error { + queueMsg := strings.Join(queues, ",") + priorityClassesMsg := strings.Join(priorityClasses, ",") + // If the provided slice of queues is empty, jobs on all queues will be cancelled + if len(queues) == 0 { + apiQueues, err := a.getAllQueuesAsAPIQueue(&QueueQueryArgs{}) + if err != nil { + return fmt.Errorf("error cancelling jobs on executor %s: %s", executor, err) + } + queues = armadaslices.Map(apiQueues, func(q *api.Queue) string { return q.Name }) + queueMsg = "all" + } + fmt.Fprintf(a.Out, "Requesting cancellation of jobs matching executor: %s, queues: %s, priority-classes: %s\n", executor, queueMsg, priorityClassesMsg) + if err := a.Params.ExecutorAPI.CancelOnExecutor(executor, queues, priorityClasses); err != nil { + return fmt.Errorf("error cancelling jobs on executor %s: %s", executor, err) + } + return nil +} diff --git a/internal/armadactl/preempt.go b/internal/armadactl/preempt.go index 8c9b183a072..185f7b8e70f 100644 --- a/internal/armadactl/preempt.go +++ b/internal/armadactl/preempt.go @@ -2,10 +2,12 @@ package armadactl import ( "fmt" + "strings" "github.com/pkg/errors" "github.com/armadaproject/armada/internal/common" + armadaslices "github.com/armadaproject/armada/internal/common/slices" "github.com/armadaproject/armada/pkg/api" "github.com/armadaproject/armada/pkg/client" ) @@ -32,3 +34,23 @@ func (a *App) Preempt(queue string, jobSetId string, jobId string) (outerErr err return nil }) } + +func (a *App) PreemptOnExecutor(executor string, queues []string, priorityClasses []string) error { + queueMsg := strings.Join(queues, ",") + priorityClassesMsg := strings.Join(priorityClasses, ",") + // If the provided slice of queues is empty, jobs on all queues will be cancelled + if len(queues) == 0 { + apiQueues, err := a.getAllQueuesAsAPIQueue(&QueueQueryArgs{}) + if err != nil { + return fmt.Errorf("error preempting jobs on executor %s: %s", executor, err) + } + queues = armadaslices.Map(apiQueues, func(q *api.Queue) string { return q.Name }) + queueMsg = "all" + } + + fmt.Fprintf(a.Out, "Requesting preemption of jobs matching executor: %s, queues: %s, priority-classes: %s\n", executor, queueMsg, priorityClassesMsg) + if err := a.Params.ExecutorAPI.PreemptOnExecutor(executor, queues, priorityClasses); err != nil { + return fmt.Errorf("error preempting jobs on executor %s: %s", executor, err) + } + return nil +} diff --git a/internal/common/ingest/testfixtures/event.go b/internal/common/ingest/testfixtures/event.go index ecdb173b5dd..05a38e665aa 100644 --- a/internal/common/ingest/testfixtures/event.go +++ b/internal/common/ingest/testfixtures/event.go @@ -516,6 +516,26 @@ var DeleteExecutorSettings = &controlplaneevents.Event{ }, } +var PreemptOnExecutor = &controlplaneevents.Event{ + Event: &controlplaneevents.Event_PreemptOnExecutor{ + PreemptOnExecutor: &controlplaneevents.PreemptOnExecutor{ + Name: ExecutorId, + Queues: []string{Queue}, + PriorityClasses: []string{PriorityClassName}, + }, + }, +} + +var CancelOnExecutor = &controlplaneevents.Event{ + Event: &controlplaneevents.Event_CancelOnExecutor{ + CancelOnExecutor: &controlplaneevents.CancelOnExecutor{ + Name: ExecutorId, + Queues: []string{Queue}, + PriorityClasses: []string{PriorityClassName}, + }, + }, +} + func JobSetCancelRequestedWithStateFilter(states ...armadaevents.JobState) *armadaevents.EventSequence_Event { return &armadaevents.EventSequence_Event{ Created: testfixtures.BasetimeProto, diff --git a/internal/scheduler/database/query.sql.go b/internal/scheduler/database/query.sql.go index f4e8f9a95a4..6e69dd08d07 100644 --- a/internal/scheduler/database/query.sql.go +++ b/internal/scheduler/database/query.sql.go @@ -430,38 +430,38 @@ type SelectInitialJobsRow struct { func (q *Queries) SelectInitialJobs(ctx context.Context, arg SelectInitialJobsParams) ([]SelectInitialJobsRow, error) { rows, err := q.db.Query(ctx, selectInitialJobs, arg.Serial, arg.Limit) if err != nil { - return nil, err - } + return nil, err + } defer rows.Close() var items []SelectInitialJobsRow for rows.Next() { - var i SelectInitialJobsRow - if err := rows.Scan( - &i.JobID, - &i.JobSet, - &i.Queue, - &i.Priority, - &i.Submitted, - &i.Queued, - &i.QueuedVersion, - &i.Validated, - &i.CancelRequested, - &i.CancelByJobsetRequested, - &i.Cancelled, - &i.Succeeded, - &i.Failed, - &i.SchedulingInfo, - &i.SchedulingInfoVersion, - &i.Pools, - &i.Serial, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Err(); err != nil { + var i SelectInitialJobsRow + if err := rows.Scan( + &i.JobID, + &i.JobSet, + &i.Queue, + &i.Priority, + &i.Submitted, + &i.Queued, + &i.QueuedVersion, + &i.Validated, + &i.CancelRequested, + &i.CancelByJobsetRequested, + &i.Cancelled, + &i.Succeeded, + &i.Failed, + &i.SchedulingInfo, + &i.SchedulingInfoVersion, + &i.Pools, + &i.Serial, + ); err != nil { return nil, err } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } return items, nil } @@ -529,47 +529,47 @@ type SelectInitialRunsParams struct { func (q *Queries) SelectInitialRuns(ctx context.Context, arg SelectInitialRunsParams) ([]Run, error) { rows, err := q.db.Query(ctx, selectInitialRuns, arg.Serial, arg.Limit, arg.JobIds) if err != nil { - return nil, err - } + return nil, err + } defer rows.Close() var items []Run for rows.Next() { - var i Run - if err := rows.Scan( - &i.RunID, - &i.JobID, - &i.Created, - &i.JobSet, - &i.Executor, - &i.Node, - &i.Cancelled, - &i.Running, - &i.Succeeded, - &i.Failed, - &i.Returned, - &i.RunAttempted, - &i.Serial, - &i.LastModified, - &i.LeasedTimestamp, - &i.PendingTimestamp, - &i.RunningTimestamp, - &i.TerminatedTimestamp, - &i.ScheduledAtPriority, - &i.Preempted, - &i.Pending, - &i.PreemptedTimestamp, - &i.PodRequirementsOverlay, - &i.PreemptRequested, - &i.Queue, - &i.Pool, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Err(); err != nil { + var i Run + if err := rows.Scan( + &i.RunID, + &i.JobID, + &i.Created, + &i.JobSet, + &i.Executor, + &i.Node, + &i.Cancelled, + &i.Running, + &i.Succeeded, + &i.Failed, + &i.Returned, + &i.RunAttempted, + &i.Serial, + &i.LastModified, + &i.LeasedTimestamp, + &i.PendingTimestamp, + &i.RunningTimestamp, + &i.TerminatedTimestamp, + &i.ScheduledAtPriority, + &i.Preempted, + &i.Pending, + &i.PreemptedTimestamp, + &i.PodRequirementsOverlay, + &i.PreemptRequested, + &i.Queue, + &i.Pool, + ); err != nil { return nil, err } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } return items, nil } @@ -892,11 +892,11 @@ VALUES($1::text, $2::boolean, $3::text, $4::text, $5::timestamptz) ON CONFLICT (executor_id) DO UPDATE SET (cordoned, cordon_reason, set_by_user, set_at_time) = (excluded.cordoned, excluded.cordon_reason, excluded.set_by_user, excluded.set_at_time)` type UpsertExecutorSettingsParams struct { - ExecutorID string `db:"executor_id"` - Cordoned bool `db:"cordoned"` - CordonReason string `db:"cordon_reason"` - SetByUser string `db:"set_by_user"` - SetAtTime time.Time `db:"set_at_time"` + ExecutorID string `db:"executor_id"` + Cordoned bool `db:"cordoned"` + CordonReason string `db:"cordon_reason"` + SetByUser string `db:"set_by_user"` + SetAtTime time.Time `db:"set_at_time"` } func (q *Queries) UpsertExecutorSettings(ctx context.Context, arg UpsertExecutorSettingsParams) error { @@ -962,3 +962,55 @@ func (q *Queries) SelectLatestJobRunSerial(ctx context.Context) (int64, error) { err := row.Scan(&serial) return serial, err } + +const selectJobsByExecutorAndQueues = `-- name: SelectJobsByExecutorAndQueues :many +SELECT j.* +FROM runs jr + JOIN jobs j + ON jr.job_id = j.job_id +WHERE jr.executor = $1 + AND j.queue = ANY($2::text[]) + AND jr.succeeded = false AND jr.failed = false AND jr.cancelled = false AND jr.preempted = false +` + +func (q *Queries) SelectAllJobsByExecutorAndQueues(ctx context.Context, executor string, queues []string) ([]Job, error) { + rows, err := q.db.Query(ctx, selectJobsByExecutorAndQueues, executor, queues) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Job + for rows.Next() { + var i Job + if err := rows.Scan( + &i.JobID, + &i.JobSet, + &i.Queue, + &i.UserID, + &i.Submitted, + &i.Groups, + &i.Priority, + &i.Queued, + &i.QueuedVersion, + &i.CancelRequested, + &i.Cancelled, + &i.CancelByJobsetRequested, + &i.Succeeded, + &i.Failed, + &i.SubmitMessage, + &i.SchedulingInfo, + &i.SchedulingInfoVersion, + &i.Serial, + &i.LastModified, + &i.Validated, + &i.Pools, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} diff --git a/internal/scheduleringester/dbops.go b/internal/scheduleringester/dbops.go index 4ac250d9931..dbf3c7ec93f 100644 --- a/internal/scheduleringester/dbops.go +++ b/internal/scheduleringester/dbops.go @@ -75,6 +75,18 @@ type ExecutorSettingsDelete struct { ExecutorID string } +type PreemptOnExecutor struct { + Name string + Queues []string + PriorityClasses []string +} + +type CancelOnExecutor struct { + Name string + Queues []string + PriorityClasses []string +} + // DbOperation captures a generic batch database operation. // // There are 5 types of operations: @@ -171,6 +183,8 @@ type ( UpsertExecutorSettings map[string]*ExecutorSettingsUpsert DeleteExecutorSettings map[string]*ExecutorSettingsDelete + PreemptExecutor map[string]*PreemptOnExecutor + CancelExecutor map[string]*CancelOnExecutor ) type jobSetOperation interface { @@ -315,6 +329,14 @@ func (a DeleteExecutorSettings) Merge(_ DbOperation) bool { return false } +func (pe PreemptExecutor) Merge(_ DbOperation) bool { + return false +} + +func (ce CancelExecutor) Merge(_ DbOperation) bool { + return false +} + // MergeInMap merges an op b into a, provided that b is of the same type as a. // For example, if a is of type MarkJobSetsCancelRequested, b is only merged if also of type MarkJobSetsCancelRequested. // Returns true if the ops were merged and false otherwise. @@ -467,15 +489,22 @@ func (a MarkJobsValidated) CanBeAppliedBefore(b DbOperation) bool { // Can be applied before another operation only if it relates to a different executor func (a UpsertExecutorSettings) CanBeAppliedBefore(b DbOperation) bool { switch op := b.(type) { - case UpsertExecutorSettings: - for k := range a { - if _, ok := op[k]; ok { + case executorOperation: + for executor := range a { + if affectsExecutor := op.affectsExecutor(executor); affectsExecutor { return false } } - case DeleteExecutorSettings: - for k := range a { - if _, ok := op[k]; ok { + } + return true +} + +// Can be applied before another operation only if it relates to a different executor +func (a DeleteExecutorSettings) CanBeAppliedBefore(b DbOperation) bool { + switch op := b.(type) { + case executorOperation: + for executor := range a { + if affectsExecutor := op.affectsExecutor(executor); affectsExecutor { return false } } @@ -483,18 +512,23 @@ func (a UpsertExecutorSettings) CanBeAppliedBefore(b DbOperation) bool { return true } -// Can be applied before another operation only if it relates to a different executor -func (a DeleteExecutorSettings) CanBeAppliedBefore(b DbOperation) bool { +func (pe PreemptExecutor) CanBeAppliedBefore(b DbOperation) bool { switch op := b.(type) { - case UpsertExecutorSettings: - for k := range a { - if _, ok := op[k]; ok { + case executorOperation: + for executor := range pe { + if affectsExecutor := op.affectsExecutor(executor); affectsExecutor { return false } } - case DeleteExecutorSettings: - for k := range a { - if _, ok := op[k]; ok { + } + return true +} + +func (ce CancelExecutor) CanBeAppliedBefore(b DbOperation) bool { + switch op := b.(type) { + case executorOperation: + for executor := range ce { + if affectsExecutor := op.affectsExecutor(executor); affectsExecutor { return false } } @@ -641,3 +675,35 @@ func (a UpsertExecutorSettings) GetOperation() Operation { func (a DeleteExecutorSettings) GetOperation() Operation { return ControlPlaneOperation } + +func (pe PreemptExecutor) GetOperation() Operation { + return ControlPlaneOperation +} + +func (ce CancelExecutor) GetOperation() Operation { + return ControlPlaneOperation +} + +type executorOperation interface { + affectsExecutor(string) bool +} + +func (a UpsertExecutorSettings) affectsExecutor(executor string) bool { + _, ok := a[executor] + return ok +} + +func (a DeleteExecutorSettings) affectsExecutor(executor string) bool { + _, ok := a[executor] + return ok +} + +func (pe PreemptExecutor) affectsExecutor(executor string) bool { + _, ok := pe[executor] + return ok +} + +func (ce CancelExecutor) affectsExecutor(executor string) bool { + _, ok := ce[executor] + return ok +} diff --git a/internal/scheduleringester/instructions.go b/internal/scheduleringester/instructions.go index 9834f25be71..13a526bb57d 100644 --- a/internal/scheduleringester/instructions.go +++ b/internal/scheduleringester/instructions.go @@ -410,6 +410,10 @@ func (c *ControlPlaneEventsInstructionConverter) dbOperationFromControlPlaneEven operations, err = c.handleExecutorSettingsUpsert(event.GetExecutorSettingsUpsert(), eventTime) case *controlplaneevents.Event_ExecutorSettingsDelete: operations, err = c.handleExecutorSettingsDelete(event.GetExecutorSettingsDelete()) + case *controlplaneevents.Event_PreemptOnExecutor: + operations, err = c.handlePreemptOnExecutor(event.GetPreemptOnExecutor()) + case *controlplaneevents.Event_CancelOnExecutor: + operations, err = c.handleCancelOnExecutor(event.GetCancelOnExecutor()) default: log.Errorf("Unknown event of type %T", ev) } @@ -445,6 +449,30 @@ func (c *ControlPlaneEventsInstructionConverter) handleExecutorSettingsDelete(de }, nil } +func (c *ControlPlaneEventsInstructionConverter) handlePreemptOnExecutor(preempt *controlplaneevents.PreemptOnExecutor) ([]DbOperation, error) { + return []DbOperation{ + PreemptExecutor{ + preempt.Name: { + Name: preempt.Name, + Queues: preempt.Queues, + PriorityClasses: preempt.PriorityClasses, + }, + }, + }, nil +} + +func (c *ControlPlaneEventsInstructionConverter) handleCancelOnExecutor(cancel *controlplaneevents.CancelOnExecutor) ([]DbOperation, error) { + return []DbOperation{ + CancelExecutor{ + cancel.Name: { + Name: cancel.Name, + Queues: cancel.Queues, + PriorityClasses: cancel.PriorityClasses, + }, + }, + }, nil +} + // schedulingInfoFromSubmitJob returns a minimal representation of a job containing only the info needed by the scheduler. func (c *JobSetEventsInstructionConverter) schedulingInfoFromSubmitJob(submitJob *armadaevents.SubmitJob, submitTime time.Time) (*schedulerobjects.JobSchedulingInfo, error) { return SchedulingInfoFromSubmitJob(submitJob, submitTime) diff --git a/internal/scheduleringester/instructions_test.go b/internal/scheduleringester/instructions_test.go index 6b8cc4af692..ca4c6760e08 100644 --- a/internal/scheduleringester/instructions_test.go +++ b/internal/scheduleringester/instructions_test.go @@ -272,6 +272,26 @@ func TestConvertControlPlaneEvent(t *testing.T) { }, }}, }, + "preempt on executor": { + event: f.PreemptOnExecutor, + expected: []DbOperation{PreemptExecutor{ + f.ExecutorId: &PreemptOnExecutor{ + Name: f.ExecutorId, + Queues: []string{f.Queue}, + PriorityClasses: []string{f.PriorityClassName}, + }, + }}, + }, + "cancel on executor": { + event: f.CancelOnExecutor, + expected: []DbOperation{CancelExecutor{ + f.ExecutorId: &CancelOnExecutor{ + Name: f.ExecutorId, + Queues: []string{f.Queue}, + PriorityClasses: []string{f.PriorityClassName}, + }, + }}, + }, } for name, tc := range tests { diff --git a/internal/scheduleringester/schedulerdb.go b/internal/scheduleringester/schedulerdb.go index 5b99b1f22a2..3ab11bfb8e6 100644 --- a/internal/scheduleringester/schedulerdb.go +++ b/internal/scheduleringester/schedulerdb.go @@ -4,6 +4,7 @@ import ( "fmt" "time" + "github.com/gogo/protobuf/proto" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/pkg/errors" @@ -15,6 +16,7 @@ import ( "github.com/armadaproject/armada/internal/common/ingest/metrics" "github.com/armadaproject/armada/internal/common/slices" schedulerdb "github.com/armadaproject/armada/internal/scheduler/database" + "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) const ( @@ -378,12 +380,125 @@ func (s *SchedulerDb) WriteDbOp(ctx *armadacontext.Context, tx pgx.Tx, op DbOper } } return nil + case CancelExecutor: + for executor, cancelRequest := range o { + jobs, err := queries.SelectAllJobsByExecutorAndQueues(ctx, executor, cancelRequest.Queues) + if err != nil { + return errors.Wrapf(err, "error cancelling jobs on executor %s by queue and priority class", executor) + } + inPriorityClasses := jobInPriorityClasses(cancelRequest.PriorityClasses) + jobsToCancel := make([]schedulerdb.Job, 0) + for _, job := range jobs { + ok, err := inPriorityClasses(job) + if err != nil { + return errors.Wrapf(err, "error cancelling jobs on executor %s by queue and priority class", executor) + } + if ok { + jobsToCancel = append(jobsToCancel, job) + } + } + for _, requestCancelParams := range createMarkJobsCancelRequestedByIdParams(jobsToCancel) { + err = queries.MarkJobsCancelRequestedById(ctx, *requestCancelParams) + if err != nil { + return errors.Wrapf(err, "error cancelling jobs on executor %s by queue and priority class", executor) + } + } + } + case PreemptExecutor: + for executor, preemptRequest := range o { + jobs, err := queries.SelectAllJobsByExecutorAndQueues(ctx, executor, preemptRequest.Queues) + if err != nil { + return errors.Wrapf(err, "error preempting jobs on executor %s by queue and priority class", executor) + } + inPriorityClasses := jobInPriorityClasses(preemptRequest.PriorityClasses) + jobsToPreempt := make([]schedulerdb.Job, 0) + for _, job := range jobs { + ok, err := inPriorityClasses(job) + if err != nil { + return errors.Wrapf(err, "error preempting jobs on executor %s by queue and priority class", executor) + } + if ok { + jobsToPreempt = append(jobsToPreempt, job) + } + } + for _, requestPreemptParams := range createMarkJobRunsPreemptRequestedByJobIdParams(jobsToPreempt) { + err = queries.MarkJobRunsPreemptRequestedByJobId(ctx, *requestPreemptParams) + if err != nil { + return errors.Wrapf(err, "error preempting jobs on executor %s by queue and priority class", executor) + } + } + } default: return errors.Errorf("received unexpected op %+v", op) } return nil } +// createMarkJobCancelRequestedById returns []*schedulerdb.MarkJobsCancelRequestedByIdParams for the specified jobs such +// that no two MarkJobsCancelRequestedByIdParams are for the same queue and jobset +func createMarkJobsCancelRequestedByIdParams(jobs []schedulerdb.Job) []*schedulerdb.MarkJobsCancelRequestedByIdParams { + result := make([]*schedulerdb.MarkJobsCancelRequestedByIdParams, 0) + mapping := map[string]map[string]*schedulerdb.MarkJobsCancelRequestedByIdParams{} + for _, job := range jobs { + if _, ok := mapping[job.Queue]; !ok { + mapping[job.Queue] = map[string]*schedulerdb.MarkJobsCancelRequestedByIdParams{} + } + if _, ok := mapping[job.Queue][job.JobSet]; !ok { + mapping[job.Queue][job.JobSet] = &schedulerdb.MarkJobsCancelRequestedByIdParams{ + Queue: job.Queue, + JobSet: job.JobSet, + JobIds: make([]string, 0), + } + result = append(result, mapping[job.Queue][job.JobSet]) + } + + mapping[job.Queue][job.JobSet].JobIds = append(mapping[job.Queue][job.JobSet].JobIds, job.JobID) + } + + return result +} + +// createMarkJobRunsPreemptRequestedByJobIdParams returns []schedulerdb.MarkJobRunsPreemptRequestedByJobIdParams for the specified jobs such +// that no two MarkJobRunsPreemptRequestedByJobIdParams are for the same queue and jobset +func createMarkJobRunsPreemptRequestedByJobIdParams(jobs []schedulerdb.Job) []*schedulerdb.MarkJobRunsPreemptRequestedByJobIdParams { + result := make([]*schedulerdb.MarkJobRunsPreemptRequestedByJobIdParams, 0) + mapping := map[string]map[string]*schedulerdb.MarkJobRunsPreemptRequestedByJobIdParams{} + for _, job := range jobs { + if _, ok := mapping[job.Queue]; !ok { + mapping[job.Queue] = map[string]*schedulerdb.MarkJobRunsPreemptRequestedByJobIdParams{} + } + if _, ok := mapping[job.Queue][job.JobSet]; !ok { + mapping[job.Queue][job.JobSet] = &schedulerdb.MarkJobRunsPreemptRequestedByJobIdParams{ + Queue: job.Queue, + JobSet: job.JobSet, + JobIds: make([]string, 0), + } + result = append(result, mapping[job.Queue][job.JobSet]) + } + + mapping[job.Queue][job.JobSet].JobIds = append(mapping[job.Queue][job.JobSet].JobIds, job.JobID) + } + + return result +} + +func jobInPriorityClasses(priorityClasses []string) func(schedulerdb.Job) (bool, error) { + priorityClassMap := map[string]bool{} + for _, priorityClass := range priorityClasses { + priorityClassMap[priorityClass] = true + } + return func(job schedulerdb.Job) (bool, error) { + schedulingInfo := &schedulerobjects.JobSchedulingInfo{} + if err := proto.Unmarshal(job.SchedulingInfo, schedulingInfo); err != nil { + err = errors.Wrapf(err, "error unmarshalling scheduling info for job %s", job.JobID) + return false, err + } + + _, ok := priorityClassMap[schedulingInfo.PriorityClassName] + return ok, nil + } +} + // getLockKey is a local method to determine a lock key for the provided set of operations. An error will be returned // if the operations don't have an associated lock key. Currently, only jobSet events require locking as they rely // on row sequence numbers being monotonically increasing. diff --git a/internal/server/executor/executor.go b/internal/server/executor/executor.go index fff09bf7ef9..3cbc4509deb 100644 --- a/internal/server/executor/executor.go +++ b/internal/server/executor/executor.go @@ -105,3 +105,69 @@ func (s *Server) DeleteExecutorSettings(grpcCtx context.Context, req *api.Execut return &types.Empty{}, nil } + +func (s *Server) PreemptOnExecutor(grpcCtx context.Context, req *api.ExecutorPreemptRequest) (*types.Empty, error) { + ctx := armadacontext.FromGrpcCtx(grpcCtx) + err := s.authorizer.AuthorizeAction(ctx, permissions.PreemptAnyJobs) + var ep *armadaerrors.ErrUnauthorized + if errors.As(err, &ep) { + return nil, status.Errorf(codes.PermissionDenied, "error preempting jobs on executor %s: %s", req.Name, ep) + } else if err != nil { + return nil, status.Errorf(codes.Unavailable, "error checking permissions: %s", err) + } + + if req.Name == "" { + return nil, fmt.Errorf("must provide non-empty executor name when determining what to preempt") + } + + es := &controlplaneevents.Event{ + Created: protoutil.ToTimestamp(s.clock.Now().UTC()), + Event: &controlplaneevents.Event_PreemptOnExecutor{ + PreemptOnExecutor: &controlplaneevents.PreemptOnExecutor{ + Name: req.Name, + Queues: req.Queues, + PriorityClasses: req.PriorityClasses, + }, + }, + } + + err = s.publisher.PublishMessages(ctx, es) + if err != nil { + return nil, status.Error(codes.Internal, "Failed to send events to Pulsar") + } + + return &types.Empty{}, nil +} + +func (s *Server) CancelOnExecutor(grpcCtx context.Context, req *api.ExecutorCancelRequest) (*types.Empty, error) { + ctx := armadacontext.FromGrpcCtx(grpcCtx) + err := s.authorizer.AuthorizeAction(ctx, permissions.CancelAnyJobs) + var ep *armadaerrors.ErrUnauthorized + if errors.As(err, &ep) { + return nil, status.Errorf(codes.PermissionDenied, "error cancelling jobs on executor %s: %s", req.Name, ep) + } else if err != nil { + return nil, status.Errorf(codes.Unavailable, "error checking permissions: %s", err) + } + + if req.Name == "" { + return nil, fmt.Errorf("must provide non-empty executor name when determining what to cancel") + } + + es := &controlplaneevents.Event{ + Created: protoutil.ToTimestamp(s.clock.Now().UTC()), + Event: &controlplaneevents.Event_CancelOnExecutor{ + CancelOnExecutor: &controlplaneevents.CancelOnExecutor{ + Name: req.Name, + Queues: req.Queues, + PriorityClasses: req.PriorityClasses, + }, + }, + } + + err = s.publisher.PublishMessages(ctx, es) + if err != nil { + return nil, status.Error(codes.Internal, "Failed to send events to Pulsar") + } + + return &types.Empty{}, nil +} diff --git a/pkg/api/executor.pb.go b/pkg/api/executor.pb.go index be71aab0201..c2966395808 100644 --- a/pkg/api/executor.pb.go +++ b/pkg/api/executor.pb.go @@ -133,38 +133,171 @@ func (m *ExecutorSettingsDeleteRequest) GetName() string { return "" } +// Jobs on the specified executor matching both the provided queues and priority classes will be preempted +type ExecutorPreemptRequest struct { + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Queues []string `protobuf:"bytes,2,rep,name=queues,proto3" json:"queues,omitempty"` + PriorityClasses []string `protobuf:"bytes,3,rep,name=priorityClasses,proto3" json:"priorityClasses,omitempty"` +} + +func (m *ExecutorPreemptRequest) Reset() { *m = ExecutorPreemptRequest{} } +func (m *ExecutorPreemptRequest) String() string { return proto.CompactTextString(m) } +func (*ExecutorPreemptRequest) ProtoMessage() {} +func (*ExecutorPreemptRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_506cd9cd149291ea, []int{2} +} +func (m *ExecutorPreemptRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ExecutorPreemptRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_ExecutorPreemptRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *ExecutorPreemptRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_ExecutorPreemptRequest.Merge(m, src) +} +func (m *ExecutorPreemptRequest) XXX_Size() int { + return m.Size() +} +func (m *ExecutorPreemptRequest) XXX_DiscardUnknown() { + xxx_messageInfo_ExecutorPreemptRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_ExecutorPreemptRequest proto.InternalMessageInfo + +func (m *ExecutorPreemptRequest) GetName() string { + if m != nil { + return m.Name + } + return "" +} + +func (m *ExecutorPreemptRequest) GetQueues() []string { + if m != nil { + return m.Queues + } + return nil +} + +func (m *ExecutorPreemptRequest) GetPriorityClasses() []string { + if m != nil { + return m.PriorityClasses + } + return nil +} + +// Jobs on the specified executor matching both the provided queues and priority classes will be cancelled +type ExecutorCancelRequest struct { + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Queues []string `protobuf:"bytes,2,rep,name=queues,proto3" json:"queues,omitempty"` + PriorityClasses []string `protobuf:"bytes,3,rep,name=priorityClasses,proto3" json:"priorityClasses,omitempty"` +} + +func (m *ExecutorCancelRequest) Reset() { *m = ExecutorCancelRequest{} } +func (m *ExecutorCancelRequest) String() string { return proto.CompactTextString(m) } +func (*ExecutorCancelRequest) ProtoMessage() {} +func (*ExecutorCancelRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_506cd9cd149291ea, []int{3} +} +func (m *ExecutorCancelRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ExecutorCancelRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_ExecutorCancelRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *ExecutorCancelRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_ExecutorCancelRequest.Merge(m, src) +} +func (m *ExecutorCancelRequest) XXX_Size() int { + return m.Size() +} +func (m *ExecutorCancelRequest) XXX_DiscardUnknown() { + xxx_messageInfo_ExecutorCancelRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_ExecutorCancelRequest proto.InternalMessageInfo + +func (m *ExecutorCancelRequest) GetName() string { + if m != nil { + return m.Name + } + return "" +} + +func (m *ExecutorCancelRequest) GetQueues() []string { + if m != nil { + return m.Queues + } + return nil +} + +func (m *ExecutorCancelRequest) GetPriorityClasses() []string { + if m != nil { + return m.PriorityClasses + } + return nil +} + func init() { proto.RegisterType((*ExecutorSettingsUpsertRequest)(nil), "api.ExecutorSettingsUpsertRequest") proto.RegisterType((*ExecutorSettingsDeleteRequest)(nil), "api.ExecutorSettingsDeleteRequest") + proto.RegisterType((*ExecutorPreemptRequest)(nil), "api.ExecutorPreemptRequest") + proto.RegisterType((*ExecutorCancelRequest)(nil), "api.ExecutorCancelRequest") } func init() { proto.RegisterFile("pkg/api/executor.proto", fileDescriptor_506cd9cd149291ea) } var fileDescriptor_506cd9cd149291ea = []byte{ - // 369 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x52, 0x3d, 0x4f, 0x02, 0x41, - 0x10, 0x65, 0xc1, 0x18, 0xbc, 0x18, 0x8b, 0x2d, 0x2e, 0xe4, 0xc4, 0x03, 0xaf, 0x10, 0x42, 0xcc, - 0x6d, 0xc4, 0xce, 0xc2, 0x44, 0x22, 0xb1, 0xc7, 0xd8, 0xd8, 0x98, 0xe5, 0x18, 0xcf, 0x55, 0x6e, - 0x77, 0xbd, 0xdb, 0x33, 0x1a, 0x43, 0x63, 0x65, 0x69, 0xe2, 0xcf, 0xf1, 0x0f, 0x58, 0x92, 0xd8, - 0x58, 0x11, 0x03, 0x56, 0xfc, 0x0a, 0x73, 0x1f, 0x10, 0xd0, 0x44, 0xb4, 0xdb, 0x99, 0xf7, 0xf6, - 0xbd, 0x99, 0xb7, 0xab, 0xe9, 0xf2, 0xca, 0x25, 0x54, 0x32, 0x02, 0xb7, 0xe0, 0x84, 0x4a, 0xf8, - 0xb6, 0xf4, 0x85, 0x12, 0x38, 0x47, 0x25, 0x33, 0x8a, 0xae, 0x10, 0x6e, 0x17, 0x62, 0x9c, 0x72, - 0x2e, 0x14, 0x55, 0x4c, 0xf0, 0x20, 0xa1, 0x18, 0xeb, 0x29, 0x1a, 0x57, 0xed, 0xf0, 0x9c, 0x80, - 0x27, 0xd5, 0x5d, 0x02, 0x5a, 0x2f, 0x48, 0xdb, 0x68, 0xa6, 0x92, 0xc7, 0xa0, 0x14, 0xe3, 0x6e, - 0x70, 0x22, 0x03, 0xf0, 0x55, 0x0b, 0xae, 0x43, 0x08, 0x14, 0xde, 0xd2, 0x96, 0x38, 0xf5, 0xa0, - 0x80, 0xca, 0xa8, 0xba, 0xd2, 0xc0, 0xe3, 0x41, 0x69, 0x2d, 0xaa, 0xb7, 0x85, 0xc7, 0x54, 0xac, - 0xd4, 0x8a, 0x71, 0x5c, 0xd7, 0xf2, 0x8e, 0xf0, 0x3b, 0x82, 0x43, 0xa7, 0x90, 0x2d, 0xa3, 0x6a, - 0xbe, 0xa1, 0x8f, 0x07, 0x25, 0x3c, 0xe9, 0xcd, 0xf0, 0xa7, 0x3c, 0xbc, 0xaf, 0xad, 0x26, 0xe7, - 0x16, 0xd0, 0x40, 0xf0, 0x42, 0x2e, 0xf6, 0x30, 0xc6, 0x83, 0x92, 0x3e, 0xdb, 0x9f, 0xb9, 0x3b, - 0xc7, 0xb7, 0x8e, 0x7e, 0x0e, 0x7f, 0x08, 0x5d, 0x50, 0xf0, 0xcf, 0xe1, 0xeb, 0x8f, 0x59, 0x2d, - 0x3f, 0x51, 0xc2, 0x3d, 0x4d, 0x4f, 0x22, 0xf8, 0xae, 0x8d, 0x2d, 0x9b, 0x4a, 0x66, 0xff, 0x9a, - 0x97, 0xa1, 0xdb, 0x49, 0xde, 0xf6, 0x24, 0x6f, 0xbb, 0x19, 0x19, 0x59, 0x95, 0x87, 0xb7, 0xcf, - 0xe7, 0xec, 0xa6, 0x51, 0x24, 0x37, 0x3b, 0xd3, 0x57, 0x3c, 0x0b, 0x52, 0x0d, 0x72, 0x1f, 0xcd, - 0xd1, 0xdb, 0x43, 0xb5, 0xc8, 0x3e, 0x59, 0xe2, 0x8f, 0xf6, 0x73, 0x1b, 0x2f, 0xb2, 0xaf, 0x2d, - 0xb2, 0x6f, 0x1c, 0xbc, 0x0e, 0x4d, 0xd4, 0x1f, 0x9a, 0xe8, 0x63, 0x68, 0xa2, 0xa7, 0x91, 0x99, - 0xe9, 0x8f, 0xcc, 0xcc, 0xfb, 0xc8, 0xcc, 0x9c, 0x56, 0x5c, 0xa6, 0x2e, 0xc2, 0xb6, 0xed, 0x08, - 0x8f, 0x50, 0xdf, 0xa3, 0x1d, 0x2a, 0x7d, 0x71, 0x09, 0x8e, 0x4a, 0x2b, 0x92, 0xfe, 0xd1, 0xf6, - 0x72, 0xec, 0xbd, 0xfb, 0x15, 0x00, 0x00, 0xff, 0xff, 0x63, 0xab, 0xb0, 0x13, 0xb5, 0x02, 0x00, + // 513 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xd4, 0x94, 0x31, 0x6f, 0xd3, 0x40, + 0x14, 0xc7, 0x73, 0x09, 0xaa, 0xd2, 0x13, 0x82, 0x62, 0x81, 0x65, 0x9c, 0xd6, 0x0e, 0x96, 0x68, + 0xa3, 0xaa, 0xb2, 0x45, 0xd9, 0x18, 0x90, 0x48, 0xa9, 0x3a, 0x82, 0x82, 0x58, 0x58, 0xd0, 0xc5, + 0x79, 0x18, 0x43, 0xec, 0xbb, 0xfa, 0xce, 0x88, 0x0a, 0x75, 0xe1, 0x13, 0x20, 0xf1, 0x5d, 0x60, + 0xe0, 0x0b, 0x30, 0x56, 0x62, 0x61, 0xb2, 0x50, 0xc2, 0x82, 0x3f, 0x05, 0xf2, 0x9d, 0x1d, 0xec, + 0xa0, 0x52, 0x3a, 0x76, 0xcb, 0xbd, 0xf7, 0xff, 0xbf, 0xdf, 0xbb, 0xf8, 0x6f, 0x63, 0x9d, 0xbd, + 0x0e, 0x3c, 0xc2, 0x42, 0x0f, 0xde, 0x82, 0x9f, 0x0a, 0x9a, 0xb8, 0x2c, 0xa1, 0x82, 0x6a, 0x1d, + 0xc2, 0x42, 0x73, 0x3d, 0xa0, 0x34, 0x98, 0x82, 0xec, 0x93, 0x38, 0xa6, 0x82, 0x88, 0x90, 0xc6, + 0x5c, 0x49, 0xcc, 0x5e, 0xd9, 0x95, 0xa7, 0x71, 0xfa, 0xc2, 0x83, 0x88, 0x89, 0x23, 0xd5, 0x74, + 0xbe, 0x20, 0xbc, 0xb1, 0x5f, 0x8e, 0x7c, 0x02, 0x42, 0x84, 0x71, 0xc0, 0x9f, 0x32, 0x0e, 0x89, + 0x18, 0xc1, 0x61, 0x0a, 0x5c, 0x68, 0x9b, 0xf8, 0x52, 0x4c, 0x22, 0x30, 0x50, 0x1f, 0x0d, 0x56, + 0x87, 0x5a, 0x9e, 0xd9, 0x57, 0x8a, 0xf3, 0x0e, 0x8d, 0x42, 0x21, 0x27, 0x8d, 0x64, 0x5f, 0xdb, + 0xc5, 0x5d, 0x9f, 0x26, 0x13, 0x1a, 0xc3, 0xc4, 0x68, 0xf7, 0xd1, 0xa0, 0x3b, 0xd4, 0xf3, 0xcc, + 0xd6, 0xaa, 0x5a, 0x4d, 0xbf, 0xd0, 0x69, 0xf7, 0xf1, 0x65, 0xf5, 0x7b, 0x04, 0x84, 0xd3, 0xd8, + 0xe8, 0x48, 0x86, 0x99, 0x67, 0xb6, 0x5e, 0xaf, 0xd7, 0xbc, 0x0d, 0xbd, 0x73, 0xf0, 0xf7, 0xf2, + 0x0f, 0x61, 0x0a, 0x02, 0xce, 0xb9, 0xbc, 0xf3, 0x19, 0x61, 0xbd, 0x9a, 0xf4, 0x38, 0x81, 0xa2, + 0x75, 0xde, 0xfb, 0xef, 0xe0, 0x95, 0xc3, 0x14, 0x52, 0xe0, 0x46, 0xbb, 0xdf, 0x19, 0xac, 0x0e, + 0xaf, 0xe7, 0x99, 0xbd, 0xa6, 0x2a, 0x35, 0x6d, 0xa9, 0xd1, 0x0e, 0xf0, 0x55, 0x96, 0x84, 0x34, + 0x09, 0xc5, 0xd1, 0xde, 0x94, 0x70, 0x0e, 0xdc, 0xe8, 0x48, 0xdb, 0x46, 0x9e, 0xd9, 0x37, 0x97, + 0x5a, 0x35, 0xff, 0xb2, 0xcb, 0xf9, 0x84, 0xf0, 0x8d, 0x6a, 0xf3, 0x3d, 0x12, 0xfb, 0x30, 0xbd, + 0x18, 0x8b, 0xef, 0xfe, 0xea, 0xe0, 0x6e, 0xb5, 0xb8, 0x76, 0x8c, 0x75, 0x95, 0xba, 0xe5, 0xc7, + 0xa9, 0x39, 0x2e, 0x61, 0xa1, 0xfb, 0xcf, 0x88, 0x9a, 0xba, 0xab, 0x22, 0xee, 0x56, 0x11, 0x77, + 0xf7, 0x0b, 0xa6, 0xb3, 0xf5, 0xfe, 0xdb, 0xcf, 0x8f, 0xed, 0x5b, 0xe6, 0xba, 0xf7, 0xe6, 0xce, + 0xe2, 0xc5, 0x79, 0xce, 0xcb, 0x19, 0xde, 0xbb, 0xe2, 0xfa, 0xc7, 0xf7, 0xd0, 0x76, 0x81, 0x57, + 0xb9, 0xf9, 0x4f, 0x7c, 0x23, 0x64, 0x67, 0xe1, 0xb7, 0xcf, 0xc4, 0x33, 0x7c, 0xad, 0x0c, 0xdd, + 0xa3, 0x78, 0xf1, 0x97, 0xf4, 0x1a, 0xe4, 0x66, 0x28, 0x4f, 0x45, 0x6e, 0x4a, 0x64, 0xdf, 0xe9, + 0xd5, 0x91, 0x1e, 0x53, 0xe6, 0x1a, 0x31, 0xc2, 0x6b, 0x2a, 0x2c, 0x35, 0xa0, 0xd9, 0x00, 0x36, + 0xb2, 0x74, 0x2a, 0xef, 0xb6, 0xe4, 0xd9, 0x8e, 0xd9, 0xe0, 0xf9, 0xd2, 0xfb, 0x07, 0x37, 0x7c, + 0xf0, 0x75, 0x66, 0xa1, 0x93, 0x99, 0x85, 0x7e, 0xcc, 0x2c, 0xf4, 0x61, 0x6e, 0xb5, 0x4e, 0xe6, + 0x56, 0xeb, 0xfb, 0xdc, 0x6a, 0x3d, 0xdb, 0x0a, 0x42, 0xf1, 0x32, 0x1d, 0xbb, 0x3e, 0x8d, 0x3c, + 0x92, 0x44, 0x64, 0x42, 0x58, 0x42, 0x5f, 0x81, 0x2f, 0xca, 0x93, 0x57, 0x7e, 0xf7, 0xc6, 0x2b, + 0x92, 0x7c, 0xf7, 0x77, 0x00, 0x00, 0x00, 0xff, 0xff, 0xed, 0x8b, 0x50, 0xc7, 0x09, 0x05, 0x00, 0x00, } @@ -182,6 +315,8 @@ const _ = grpc.SupportPackageIsVersion4 type ExecutorClient interface { UpsertExecutorSettings(ctx context.Context, in *ExecutorSettingsUpsertRequest, opts ...grpc.CallOption) (*types.Empty, error) DeleteExecutorSettings(ctx context.Context, in *ExecutorSettingsDeleteRequest, opts ...grpc.CallOption) (*types.Empty, error) + PreemptOnExecutor(ctx context.Context, in *ExecutorPreemptRequest, opts ...grpc.CallOption) (*types.Empty, error) + CancelOnExecutor(ctx context.Context, in *ExecutorCancelRequest, opts ...grpc.CallOption) (*types.Empty, error) } type executorClient struct { @@ -210,10 +345,30 @@ func (c *executorClient) DeleteExecutorSettings(ctx context.Context, in *Executo return out, nil } +func (c *executorClient) PreemptOnExecutor(ctx context.Context, in *ExecutorPreemptRequest, opts ...grpc.CallOption) (*types.Empty, error) { + out := new(types.Empty) + err := c.cc.Invoke(ctx, "/api.Executor/PreemptOnExecutor", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *executorClient) CancelOnExecutor(ctx context.Context, in *ExecutorCancelRequest, opts ...grpc.CallOption) (*types.Empty, error) { + out := new(types.Empty) + err := c.cc.Invoke(ctx, "/api.Executor/CancelOnExecutor", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // ExecutorServer is the server API for Executor service. type ExecutorServer interface { UpsertExecutorSettings(context.Context, *ExecutorSettingsUpsertRequest) (*types.Empty, error) DeleteExecutorSettings(context.Context, *ExecutorSettingsDeleteRequest) (*types.Empty, error) + PreemptOnExecutor(context.Context, *ExecutorPreemptRequest) (*types.Empty, error) + CancelOnExecutor(context.Context, *ExecutorCancelRequest) (*types.Empty, error) } // UnimplementedExecutorServer can be embedded to have forward compatible implementations. @@ -226,6 +381,12 @@ func (*UnimplementedExecutorServer) UpsertExecutorSettings(ctx context.Context, func (*UnimplementedExecutorServer) DeleteExecutorSettings(ctx context.Context, req *ExecutorSettingsDeleteRequest) (*types.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method DeleteExecutorSettings not implemented") } +func (*UnimplementedExecutorServer) PreemptOnExecutor(ctx context.Context, req *ExecutorPreemptRequest) (*types.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method PreemptOnExecutor not implemented") +} +func (*UnimplementedExecutorServer) CancelOnExecutor(ctx context.Context, req *ExecutorCancelRequest) (*types.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method CancelOnExecutor not implemented") +} func RegisterExecutorServer(s *grpc.Server, srv ExecutorServer) { s.RegisterService(&_Executor_serviceDesc, srv) @@ -267,6 +428,42 @@ func _Executor_DeleteExecutorSettings_Handler(srv interface{}, ctx context.Conte return interceptor(ctx, in, info, handler) } +func _Executor_PreemptOnExecutor_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ExecutorPreemptRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ExecutorServer).PreemptOnExecutor(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/api.Executor/PreemptOnExecutor", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ExecutorServer).PreemptOnExecutor(ctx, req.(*ExecutorPreemptRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Executor_CancelOnExecutor_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ExecutorCancelRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ExecutorServer).CancelOnExecutor(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/api.Executor/CancelOnExecutor", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ExecutorServer).CancelOnExecutor(ctx, req.(*ExecutorCancelRequest)) + } + return interceptor(ctx, in, info, handler) +} + var _Executor_serviceDesc = grpc.ServiceDesc{ ServiceName: "api.Executor", HandlerType: (*ExecutorServer)(nil), @@ -279,6 +476,14 @@ var _Executor_serviceDesc = grpc.ServiceDesc{ MethodName: "DeleteExecutorSettings", Handler: _Executor_DeleteExecutorSettings_Handler, }, + { + MethodName: "PreemptOnExecutor", + Handler: _Executor_PreemptOnExecutor_Handler, + }, + { + MethodName: "CancelOnExecutor", + Handler: _Executor_CancelOnExecutor_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "pkg/api/executor.proto", @@ -361,6 +566,102 @@ func (m *ExecutorSettingsDeleteRequest) MarshalToSizedBuffer(dAtA []byte) (int, return len(dAtA) - i, nil } +func (m *ExecutorPreemptRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ExecutorPreemptRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *ExecutorPreemptRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.PriorityClasses) > 0 { + for iNdEx := len(m.PriorityClasses) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.PriorityClasses[iNdEx]) + copy(dAtA[i:], m.PriorityClasses[iNdEx]) + i = encodeVarintExecutor(dAtA, i, uint64(len(m.PriorityClasses[iNdEx]))) + i-- + dAtA[i] = 0x1a + } + } + if len(m.Queues) > 0 { + for iNdEx := len(m.Queues) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.Queues[iNdEx]) + copy(dAtA[i:], m.Queues[iNdEx]) + i = encodeVarintExecutor(dAtA, i, uint64(len(m.Queues[iNdEx]))) + i-- + dAtA[i] = 0x12 + } + } + if len(m.Name) > 0 { + i -= len(m.Name) + copy(dAtA[i:], m.Name) + i = encodeVarintExecutor(dAtA, i, uint64(len(m.Name))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *ExecutorCancelRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ExecutorCancelRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *ExecutorCancelRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.PriorityClasses) > 0 { + for iNdEx := len(m.PriorityClasses) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.PriorityClasses[iNdEx]) + copy(dAtA[i:], m.PriorityClasses[iNdEx]) + i = encodeVarintExecutor(dAtA, i, uint64(len(m.PriorityClasses[iNdEx]))) + i-- + dAtA[i] = 0x1a + } + } + if len(m.Queues) > 0 { + for iNdEx := len(m.Queues) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.Queues[iNdEx]) + copy(dAtA[i:], m.Queues[iNdEx]) + i = encodeVarintExecutor(dAtA, i, uint64(len(m.Queues[iNdEx]))) + i-- + dAtA[i] = 0x12 + } + } + if len(m.Name) > 0 { + i -= len(m.Name) + copy(dAtA[i:], m.Name) + i = encodeVarintExecutor(dAtA, i, uint64(len(m.Name))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + func encodeVarintExecutor(dAtA []byte, offset int, v uint64) int { offset -= sovExecutor(v) base := offset @@ -405,6 +706,56 @@ func (m *ExecutorSettingsDeleteRequest) Size() (n int) { return n } +func (m *ExecutorPreemptRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Name) + if l > 0 { + n += 1 + l + sovExecutor(uint64(l)) + } + if len(m.Queues) > 0 { + for _, s := range m.Queues { + l = len(s) + n += 1 + l + sovExecutor(uint64(l)) + } + } + if len(m.PriorityClasses) > 0 { + for _, s := range m.PriorityClasses { + l = len(s) + n += 1 + l + sovExecutor(uint64(l)) + } + } + return n +} + +func (m *ExecutorCancelRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Name) + if l > 0 { + n += 1 + l + sovExecutor(uint64(l)) + } + if len(m.Queues) > 0 { + for _, s := range m.Queues { + l = len(s) + n += 1 + l + sovExecutor(uint64(l)) + } + } + if len(m.PriorityClasses) > 0 { + for _, s := range m.PriorityClasses { + l = len(s) + n += 1 + l + sovExecutor(uint64(l)) + } + } + return n +} + func sovExecutor(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 } @@ -627,6 +978,298 @@ func (m *ExecutorSettingsDeleteRequest) Unmarshal(dAtA []byte) error { } return nil } +func (m *ExecutorPreemptRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowExecutor + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ExecutorPreemptRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ExecutorPreemptRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowExecutor + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthExecutor + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthExecutor + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Name = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Queues", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowExecutor + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthExecutor + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthExecutor + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Queues = append(m.Queues, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field PriorityClasses", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowExecutor + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthExecutor + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthExecutor + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.PriorityClasses = append(m.PriorityClasses, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipExecutor(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthExecutor + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *ExecutorCancelRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowExecutor + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ExecutorCancelRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ExecutorCancelRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowExecutor + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthExecutor + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthExecutor + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Name = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Queues", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowExecutor + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthExecutor + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthExecutor + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Queues = append(m.Queues, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field PriorityClasses", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowExecutor + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthExecutor + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthExecutor + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.PriorityClasses = append(m.PriorityClasses, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipExecutor(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthExecutor + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipExecutor(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 diff --git a/pkg/api/executor.proto b/pkg/api/executor.proto index e3e5b960d8b..1e6f2c91160 100644 --- a/pkg/api/executor.proto +++ b/pkg/api/executor.proto @@ -19,6 +19,18 @@ service Executor { body: "*" }; } + rpc PreemptOnExecutor (ExecutorPreemptRequest) returns (google.protobuf.Empty) { + option (google.api.http) = { + post: "/v1/executor/preempt/{name}" + body: "*" + }; + } + rpc CancelOnExecutor (ExecutorCancelRequest) returns (google.protobuf.Empty) { + option (google.api.http) = { + post: "/v1/executor/cancel/{name}" + body: "*" + }; + } } message ExecutorSettingsUpsertRequest { @@ -30,3 +42,17 @@ message ExecutorSettingsUpsertRequest { message ExecutorSettingsDeleteRequest { string name = 1; } + +// Jobs on the specified executor matching both the provided queues and priority classes will be preempted +message ExecutorPreemptRequest { + string name = 1; + repeated string queues = 2; + repeated string priorityClasses = 3; +} + +// Jobs on the specified executor matching both the provided queues and priority classes will be cancelled +message ExecutorCancelRequest { + string name = 1; + repeated string queues = 2; + repeated string priorityClasses = 3; +} diff --git a/pkg/client/executor/cancel.go b/pkg/client/executor/cancel.go new file mode 100644 index 00000000000..5678df93360 --- /dev/null +++ b/pkg/client/executor/cancel.go @@ -0,0 +1,39 @@ +package executor + +import ( + "fmt" + + "github.com/armadaproject/armada/internal/common" + "github.com/armadaproject/armada/pkg/api" + "github.com/armadaproject/armada/pkg/client" +) + +type CancelAPI func(string, []string, []string) error + +func CancelOnExecutor(getConnectionDetails client.ConnectionDetails) CancelAPI { + return func(executor string, queues []string, priorityClasses []string) error { + connectionDetails, err := getConnectionDetails() + if err != nil { + return fmt.Errorf("failed to obtain api connection details: %s", err) + } + conn, err := client.CreateApiConnection(connectionDetails) + if err != nil { + return fmt.Errorf("failed to connect to api because %s", err) + } + defer conn.Close() + + ctx, cancel := common.ContextWithDefaultTimeout() + defer cancel() + + executorClient := api.NewExecutorClient(conn) + newCancelOnExecutor := &api.ExecutorCancelRequest{ + Name: executor, + Queues: queues, + PriorityClasses: priorityClasses, + } + if _, err = executorClient.CancelOnExecutor(ctx, newCancelOnExecutor); err != nil { + return err + } + return nil + } +} diff --git a/pkg/client/executor/preempt.go b/pkg/client/executor/preempt.go new file mode 100644 index 00000000000..c5dec15a1b7 --- /dev/null +++ b/pkg/client/executor/preempt.go @@ -0,0 +1,39 @@ +package executor + +import ( + "fmt" + + "github.com/armadaproject/armada/internal/common" + "github.com/armadaproject/armada/pkg/api" + "github.com/armadaproject/armada/pkg/client" +) + +type PreemptAPI func(string, []string, []string) error + +func PreemptOnExecutor(getConnectionDetails client.ConnectionDetails) PreemptAPI { + return func(executor string, queues []string, priorityClasses []string) error { + connectionDetails, err := getConnectionDetails() + if err != nil { + return fmt.Errorf("failed to obtain api connection details: %s", err) + } + conn, err := client.CreateApiConnection(connectionDetails) + if err != nil { + return fmt.Errorf("failed to connect to api because %s", err) + } + defer conn.Close() + + ctx, cancel := common.ContextWithDefaultTimeout() + defer cancel() + + executorClient := api.NewExecutorClient(conn) + newPreemptOnExecutor := &api.ExecutorPreemptRequest{ + Name: executor, + Queues: queues, + PriorityClasses: priorityClasses, + } + if _, err = executorClient.PreemptOnExecutor(ctx, newPreemptOnExecutor); err != nil { + return err + } + return nil + } +} diff --git a/pkg/controlplaneevents/events.pb.go b/pkg/controlplaneevents/events.pb.go index 45e2e1b0bd3..d5812bba0b5 100644 --- a/pkg/controlplaneevents/events.pb.go +++ b/pkg/controlplaneevents/events.pb.go @@ -29,6 +29,8 @@ type Event struct { // Types that are valid to be assigned to Event: // *Event_ExecutorSettingsUpsert // *Event_ExecutorSettingsDelete + // *Event_PreemptOnExecutor + // *Event_CancelOnExecutor Event isEvent_Event `protobuf_oneof:"event"` } @@ -77,9 +79,17 @@ type Event_ExecutorSettingsUpsert struct { type Event_ExecutorSettingsDelete struct { ExecutorSettingsDelete *ExecutorSettingsDelete `protobuf:"bytes,3,opt,name=executorSettingsDelete,proto3,oneof" json:"executorSettingsDelete,omitempty"` } +type Event_PreemptOnExecutor struct { + PreemptOnExecutor *PreemptOnExecutor `protobuf:"bytes,4,opt,name=preemptOnExecutor,proto3,oneof" json:"preemptOnExecutor,omitempty"` +} +type Event_CancelOnExecutor struct { + CancelOnExecutor *CancelOnExecutor `protobuf:"bytes,5,opt,name=cancelOnExecutor,proto3,oneof" json:"cancelOnExecutor,omitempty"` +} func (*Event_ExecutorSettingsUpsert) isEvent_Event() {} func (*Event_ExecutorSettingsDelete) isEvent_Event() {} +func (*Event_PreemptOnExecutor) isEvent_Event() {} +func (*Event_CancelOnExecutor) isEvent_Event() {} func (m *Event) GetEvent() isEvent_Event { if m != nil { @@ -109,11 +119,27 @@ func (m *Event) GetExecutorSettingsDelete() *ExecutorSettingsDelete { return nil } +func (m *Event) GetPreemptOnExecutor() *PreemptOnExecutor { + if x, ok := m.GetEvent().(*Event_PreemptOnExecutor); ok { + return x.PreemptOnExecutor + } + return nil +} + +func (m *Event) GetCancelOnExecutor() *CancelOnExecutor { + if x, ok := m.GetEvent().(*Event_CancelOnExecutor); ok { + return x.CancelOnExecutor + } + return nil +} + // XXX_OneofWrappers is for the internal use of the proto package. func (*Event) XXX_OneofWrappers() []interface{} { return []interface{}{ (*Event_ExecutorSettingsUpsert)(nil), (*Event_ExecutorSettingsDelete)(nil), + (*Event_PreemptOnExecutor)(nil), + (*Event_CancelOnExecutor)(nil), } } @@ -229,10 +255,132 @@ func (m *ExecutorSettingsDelete) GetName() string { return "" } +type PreemptOnExecutor struct { + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Queues []string `protobuf:"bytes,2,rep,name=queues,proto3" json:"queues,omitempty"` + PriorityClasses []string `protobuf:"bytes,3,rep,name=priorityClasses,proto3" json:"priorityClasses,omitempty"` +} + +func (m *PreemptOnExecutor) Reset() { *m = PreemptOnExecutor{} } +func (m *PreemptOnExecutor) String() string { return proto.CompactTextString(m) } +func (*PreemptOnExecutor) ProtoMessage() {} +func (*PreemptOnExecutor) Descriptor() ([]byte, []int) { + return fileDescriptor_2ccee8bdbf348752, []int{3} +} +func (m *PreemptOnExecutor) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *PreemptOnExecutor) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_PreemptOnExecutor.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *PreemptOnExecutor) XXX_Merge(src proto.Message) { + xxx_messageInfo_PreemptOnExecutor.Merge(m, src) +} +func (m *PreemptOnExecutor) XXX_Size() int { + return m.Size() +} +func (m *PreemptOnExecutor) XXX_DiscardUnknown() { + xxx_messageInfo_PreemptOnExecutor.DiscardUnknown(m) +} + +var xxx_messageInfo_PreemptOnExecutor proto.InternalMessageInfo + +func (m *PreemptOnExecutor) GetName() string { + if m != nil { + return m.Name + } + return "" +} + +func (m *PreemptOnExecutor) GetQueues() []string { + if m != nil { + return m.Queues + } + return nil +} + +func (m *PreemptOnExecutor) GetPriorityClasses() []string { + if m != nil { + return m.PriorityClasses + } + return nil +} + +type CancelOnExecutor struct { + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Queues []string `protobuf:"bytes,2,rep,name=queues,proto3" json:"queues,omitempty"` + PriorityClasses []string `protobuf:"bytes,3,rep,name=priorityClasses,proto3" json:"priorityClasses,omitempty"` +} + +func (m *CancelOnExecutor) Reset() { *m = CancelOnExecutor{} } +func (m *CancelOnExecutor) String() string { return proto.CompactTextString(m) } +func (*CancelOnExecutor) ProtoMessage() {} +func (*CancelOnExecutor) Descriptor() ([]byte, []int) { + return fileDescriptor_2ccee8bdbf348752, []int{4} +} +func (m *CancelOnExecutor) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *CancelOnExecutor) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_CancelOnExecutor.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *CancelOnExecutor) XXX_Merge(src proto.Message) { + xxx_messageInfo_CancelOnExecutor.Merge(m, src) +} +func (m *CancelOnExecutor) XXX_Size() int { + return m.Size() +} +func (m *CancelOnExecutor) XXX_DiscardUnknown() { + xxx_messageInfo_CancelOnExecutor.DiscardUnknown(m) +} + +var xxx_messageInfo_CancelOnExecutor proto.InternalMessageInfo + +func (m *CancelOnExecutor) GetName() string { + if m != nil { + return m.Name + } + return "" +} + +func (m *CancelOnExecutor) GetQueues() []string { + if m != nil { + return m.Queues + } + return nil +} + +func (m *CancelOnExecutor) GetPriorityClasses() []string { + if m != nil { + return m.PriorityClasses + } + return nil +} + func init() { proto.RegisterType((*Event)(nil), "controlplaneevents.Event") proto.RegisterType((*ExecutorSettingsUpsert)(nil), "controlplaneevents.ExecutorSettingsUpsert") proto.RegisterType((*ExecutorSettingsDelete)(nil), "controlplaneevents.ExecutorSettingsDelete") + proto.RegisterType((*PreemptOnExecutor)(nil), "controlplaneevents.PreemptOnExecutor") + proto.RegisterType((*CancelOnExecutor)(nil), "controlplaneevents.CancelOnExecutor") } func init() { @@ -240,34 +388,43 @@ func init() { } var fileDescriptor_2ccee8bdbf348752 = []byte{ - // 431 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x93, 0xb1, 0x8e, 0xd3, 0x40, - 0x10, 0x86, 0xed, 0x70, 0xc7, 0x5d, 0x16, 0x84, 0xc4, 0x22, 0x8c, 0x95, 0xc2, 0x3e, 0xe5, 0x10, - 0x42, 0x08, 0xad, 0xa5, 0x43, 0x50, 0x22, 0x30, 0x9c, 0x04, 0xa2, 0x39, 0x05, 0xae, 0xa1, 0xdb, - 0xd8, 0x83, 0x31, 0xd8, 0x5e, 0x6b, 0x77, 0x72, 0x22, 0xaf, 0x40, 0xc5, 0x73, 0xf0, 0x24, 0x94, - 0x29, 0xa9, 0x2c, 0x94, 0x74, 0xe6, 0x01, 0x68, 0x51, 0x76, 0xe3, 0xc4, 0x28, 0xe6, 0x94, 0xca, - 0xde, 0x99, 0xef, 0x9f, 0xf9, 0x67, 0xec, 0x25, 0xc7, 0xe5, 0xe7, 0x24, 0x88, 0x44, 0x81, 0x52, - 0x64, 0x65, 0xc6, 0x0b, 0x80, 0x0b, 0x28, 0x50, 0x05, 0xe6, 0xc1, 0x4a, 0x29, 0x50, 0x50, 0xba, - 0x0d, 0x0c, 0xfc, 0x44, 0x88, 0x24, 0x83, 0x40, 0x13, 0xe3, 0xc9, 0x87, 0x00, 0xd3, 0x1c, 0x14, - 0xf2, 0xbc, 0x34, 0xa2, 0xe1, 0x9f, 0x1e, 0xd9, 0x3f, 0x5d, 0xb2, 0xf4, 0x0d, 0x39, 0x88, 0x24, - 0x70, 0x84, 0xd8, 0xb5, 0x8f, 0xec, 0xfb, 0xd7, 0x4e, 0x06, 0xcc, 0x88, 0x59, 0x23, 0x66, 0xef, - 0x1a, 0x71, 0x78, 0xbb, 0xae, 0xfc, 0x9b, 0x2b, 0xfc, 0xa1, 0xc8, 0x53, 0x84, 0xbc, 0xc4, 0xe9, - 0xa8, 0xa9, 0x40, 0xbf, 0xda, 0xc4, 0x81, 0x2f, 0x10, 0x4d, 0x50, 0xc8, 0xb7, 0x80, 0x98, 0x16, - 0x89, 0x3a, 0x2f, 0x15, 0x48, 0x74, 0x7b, 0xba, 0xf8, 0x03, 0xb6, 0xed, 0x96, 0x9d, 0x76, 0x2a, - 0xc2, 0xbb, 0x75, 0xe5, 0x1f, 0x75, 0x57, 0xdb, 0xf4, 0x7e, 0x65, 0x8d, 0xfe, 0xd3, 0xb1, 0xd3, - 0xcc, 0x4b, 0xc8, 0x00, 0xc1, 0xbd, 0xb2, 0xbb, 0x19, 0xa3, 0xe8, 0x36, 0x63, 0x72, 0x97, 0x9b, - 0x59, 0xe9, 0x0f, 0xc8, 0xbe, 0x6e, 0x30, 0xfc, 0x6d, 0x13, 0xa7, 0x7b, 0x60, 0x7a, 0x8f, 0xec, - 0x15, 0x3c, 0x07, 0xfd, 0x1d, 0xfa, 0x21, 0xad, 0x2b, 0xff, 0xc6, 0xf2, 0xdc, 0x5a, 0xb4, 0xce, - 0xd3, 0x13, 0x72, 0x18, 0x09, 0x19, 0x8b, 0x02, 0x62, 0xbd, 0xd6, 0xc3, 0xd0, 0xa9, 0x2b, 0x9f, - 0x36, 0xb1, 0x16, 0xbf, 0xe6, 0xe8, 0x53, 0x72, 0xdd, 0xbc, 0x8f, 0x80, 0x2b, 0x51, 0xe8, 0x0d, - 0xf4, 0xc3, 0x41, 0x5d, 0xf9, 0x4e, 0x3b, 0xde, 0xd2, 0xfe, 0xc3, 0xd3, 0xc7, 0xa4, 0xaf, 0x00, - 0xc3, 0xe9, 0xb9, 0x02, 0xe9, 0xee, 0x69, 0xf1, 0x9d, 0xba, 0xf2, 0x6f, 0xad, 0x83, 0x2d, 0xe5, - 0x86, 0x1c, 0x3e, 0xdb, 0x1e, 0xd6, 0x2c, 0x64, 0xd7, 0x61, 0xc3, 0x8b, 0x1f, 0x73, 0xcf, 0x9e, - 0xcd, 0x3d, 0xfb, 0xd7, 0xdc, 0xb3, 0xbf, 0x2d, 0x3c, 0x6b, 0xb6, 0xf0, 0xac, 0x9f, 0x0b, 0xcf, - 0x7a, 0xff, 0x24, 0x49, 0xf1, 0xe3, 0x64, 0xcc, 0x22, 0x91, 0x07, 0x5c, 0xe6, 0x3c, 0xe6, 0xa5, - 0x14, 0x9f, 0x20, 0xc2, 0xd5, 0x29, 0xe8, 0xbe, 0x3d, 0xdf, 0x7b, 0xc7, 0xcf, 0x75, 0xfe, 0xcc, - 0xd0, 0xec, 0xb5, 0x60, 0x2f, 0x0c, 0x75, 0xb6, 0xa4, 0xf4, 0xb5, 0x50, 0xe3, 0xab, 0xfa, 0xf7, - 0x7f, 0xf4, 0x37, 0x00, 0x00, 0xff, 0xff, 0xdc, 0xdc, 0xc1, 0xeb, 0x84, 0x03, 0x00, 0x00, + // 564 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x94, 0x41, 0x8f, 0xd2, 0x40, + 0x14, 0xc7, 0xe9, 0x02, 0xbb, 0xcb, 0x68, 0x14, 0x46, 0xc5, 0x8a, 0xb1, 0x43, 0xd8, 0xd5, 0x6c, + 0xcc, 0xa6, 0x4d, 0xd6, 0xe8, 0xd1, 0xe8, 0xe0, 0x46, 0x8d, 0x07, 0x09, 0xba, 0x17, 0x6f, 0xa5, + 0x3c, 0xb1, 0xda, 0x76, 0xea, 0xcc, 0x40, 0xe4, 0x2b, 0x78, 0xf2, 0x73, 0x78, 0xd3, 0x4f, 0xe1, + 0x71, 0x8f, 0x9e, 0x1a, 0x03, 0xb7, 0xfa, 0x25, 0x0c, 0x53, 0xd8, 0x2d, 0x6d, 0x35, 0x1c, 0x3d, + 0x41, 0xe7, 0xfd, 0xfe, 0xef, 0xff, 0x6f, 0xdf, 0xcb, 0xa0, 0xbd, 0xf0, 0xc3, 0xc8, 0x72, 0x58, + 0x20, 0x39, 0xf3, 0x42, 0xcf, 0x0e, 0x00, 0x26, 0x10, 0x48, 0x61, 0x25, 0x3f, 0x66, 0xc8, 0x99, + 0x64, 0x18, 0xe7, 0x81, 0x16, 0x19, 0x31, 0x36, 0xf2, 0xc0, 0x52, 0xc4, 0x60, 0xfc, 0xd6, 0x92, + 0xae, 0x0f, 0x42, 0xda, 0x7e, 0x98, 0x88, 0x3a, 0xf3, 0x0a, 0xaa, 0x1e, 0x2f, 0x58, 0xfc, 0x02, + 0xed, 0x38, 0x1c, 0x6c, 0x09, 0x43, 0x5d, 0x6b, 0x6b, 0x07, 0x17, 0x8e, 0x5a, 0x66, 0x22, 0x36, + 0x57, 0x62, 0xf3, 0xf5, 0x4a, 0x4c, 0xaf, 0xc5, 0x11, 0x69, 0x2c, 0xf1, 0x43, 0xe6, 0xbb, 0x12, + 0xfc, 0x50, 0x4e, 0xfb, 0xab, 0x0e, 0xf8, 0xb3, 0x86, 0x9a, 0xf0, 0x09, 0x9c, 0xb1, 0x64, 0xfc, + 0x15, 0x48, 0xe9, 0x06, 0x23, 0x71, 0x12, 0x0a, 0xe0, 0x52, 0xdf, 0x52, 0xcd, 0xef, 0x9a, 0xf9, + 0xb4, 0xe6, 0x71, 0xa1, 0x82, 0xee, 0xc7, 0x11, 0x69, 0x17, 0x77, 0x3b, 0xf7, 0x7e, 0x56, 0xea, + 0xff, 0xc5, 0xb1, 0x30, 0xcc, 0x13, 0xf0, 0x40, 0x82, 0x5e, 0xde, 0x3c, 0x4c, 0xa2, 0x28, 0x0e, + 0x93, 0xd4, 0xfe, 0x1d, 0x26, 0x61, 0xf0, 0x04, 0x35, 0x42, 0x0e, 0x0b, 0xea, 0x65, 0xb0, 0xb2, + 0xd0, 0x2b, 0x2a, 0xc6, 0xed, 0xa2, 0x18, 0xbd, 0x2c, 0x4c, 0x49, 0x1c, 0x91, 0x9b, 0xb9, 0x1e, + 0x6b, 0xe6, 0x79, 0x0b, 0xcc, 0x51, 0xdd, 0xb1, 0x03, 0x07, 0xbc, 0x94, 0x6d, 0x55, 0xd9, 0xee, + 0x17, 0xd9, 0x76, 0x33, 0x2c, 0x35, 0xe2, 0x88, 0xb4, 0xb2, 0x1d, 0xd6, 0x4c, 0x73, 0xfd, 0xe9, + 0x0e, 0xaa, 0xaa, 0x76, 0x9d, 0xdf, 0x1a, 0x6a, 0x16, 0x0f, 0x17, 0xdf, 0x41, 0x95, 0xc0, 0xf6, + 0x41, 0xed, 0x5c, 0x8d, 0xe2, 0x38, 0x22, 0x97, 0x16, 0xcf, 0xa9, 0xa5, 0x52, 0x75, 0x7c, 0x84, + 0x76, 0x1d, 0xc6, 0x87, 0x2c, 0x80, 0xa1, 0x5a, 0xa1, 0x5d, 0xda, 0x8c, 0x23, 0x82, 0x57, 0x67, + 0x29, 0xfe, 0x8c, 0xc3, 0x0f, 0xd1, 0xc5, 0xe4, 0x7f, 0x1f, 0x6c, 0xc1, 0x02, 0x35, 0xed, 0x1a, + 0x6d, 0xc5, 0x11, 0x69, 0xa6, 0xcf, 0x53, 0xda, 0x35, 0x1e, 0xdf, 0x47, 0x35, 0x01, 0x92, 0x4e, + 0x4f, 0x04, 0x24, 0x33, 0xaa, 0xd1, 0xeb, 0x71, 0x44, 0xae, 0x9c, 0x1d, 0xa6, 0x94, 0xe7, 0x64, + 0xe7, 0x51, 0xfe, 0x65, 0x97, 0xc3, 0xdf, 0xf0, 0x65, 0x3b, 0xdf, 0x35, 0xd4, 0xc8, 0x0d, 0x7e, + 0xe3, 0x4f, 0x75, 0x88, 0xb6, 0x3f, 0x8e, 0x61, 0x0c, 0x42, 0xdf, 0x6a, 0x97, 0x0f, 0x6a, 0xf4, + 0x6a, 0x1c, 0x91, 0x7a, 0x72, 0x92, 0x62, 0x97, 0x0c, 0x7e, 0x8a, 0x2e, 0x87, 0xdc, 0x65, 0xdc, + 0x95, 0xd3, 0xae, 0x67, 0x0b, 0x01, 0x42, 0x2f, 0x2b, 0xd9, 0xad, 0x38, 0x22, 0x37, 0x32, 0xa5, + 0x94, 0x3e, 0xab, 0xea, 0x7c, 0xd3, 0x50, 0x3d, 0xbb, 0x36, 0xff, 0x79, 0x66, 0x3a, 0xf9, 0x31, + 0x33, 0xb4, 0xd3, 0x99, 0xa1, 0xfd, 0x9a, 0x19, 0xda, 0x97, 0xb9, 0x51, 0x3a, 0x9d, 0x1b, 0xa5, + 0x9f, 0x73, 0xa3, 0xf4, 0xe6, 0xc1, 0xc8, 0x95, 0xef, 0xc6, 0x03, 0xd3, 0x61, 0xbe, 0x65, 0x73, + 0xdf, 0x1e, 0xda, 0x21, 0x67, 0xef, 0xc1, 0x91, 0xcb, 0x27, 0xab, 0xf8, 0x4a, 0xfe, 0xba, 0xb5, + 0xf7, 0x58, 0xd5, 0x7b, 0x09, 0x6d, 0x3e, 0x67, 0x66, 0x37, 0xa1, 0x7a, 0x0b, 0x4a, 0xdd, 0xb5, + 0x62, 0xb0, 0xad, 0xee, 0xd4, 0x7b, 0x7f, 0x02, 0x00, 0x00, 0xff, 0xff, 0x09, 0xe8, 0xfb, 0x3a, + 0xd9, 0x05, 0x00, 0x00, } func (m *Event) Marshal() (dAtA []byte, err error) { @@ -356,6 +513,48 @@ func (m *Event_ExecutorSettingsDelete) MarshalToSizedBuffer(dAtA []byte) (int, e } return len(dAtA) - i, nil } +func (m *Event_PreemptOnExecutor) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Event_PreemptOnExecutor) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + if m.PreemptOnExecutor != nil { + { + size, err := m.PreemptOnExecutor.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintEvents(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x22 + } + return len(dAtA) - i, nil +} +func (m *Event_CancelOnExecutor) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Event_CancelOnExecutor) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + if m.CancelOnExecutor != nil { + { + size, err := m.CancelOnExecutor.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintEvents(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x2a + } + return len(dAtA) - i, nil +} func (m *ExecutorSettingsUpsert) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -440,6 +639,102 @@ func (m *ExecutorSettingsDelete) MarshalToSizedBuffer(dAtA []byte) (int, error) return len(dAtA) - i, nil } +func (m *PreemptOnExecutor) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *PreemptOnExecutor) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *PreemptOnExecutor) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.PriorityClasses) > 0 { + for iNdEx := len(m.PriorityClasses) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.PriorityClasses[iNdEx]) + copy(dAtA[i:], m.PriorityClasses[iNdEx]) + i = encodeVarintEvents(dAtA, i, uint64(len(m.PriorityClasses[iNdEx]))) + i-- + dAtA[i] = 0x1a + } + } + if len(m.Queues) > 0 { + for iNdEx := len(m.Queues) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.Queues[iNdEx]) + copy(dAtA[i:], m.Queues[iNdEx]) + i = encodeVarintEvents(dAtA, i, uint64(len(m.Queues[iNdEx]))) + i-- + dAtA[i] = 0x12 + } + } + if len(m.Name) > 0 { + i -= len(m.Name) + copy(dAtA[i:], m.Name) + i = encodeVarintEvents(dAtA, i, uint64(len(m.Name))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *CancelOnExecutor) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *CancelOnExecutor) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *CancelOnExecutor) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.PriorityClasses) > 0 { + for iNdEx := len(m.PriorityClasses) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.PriorityClasses[iNdEx]) + copy(dAtA[i:], m.PriorityClasses[iNdEx]) + i = encodeVarintEvents(dAtA, i, uint64(len(m.PriorityClasses[iNdEx]))) + i-- + dAtA[i] = 0x1a + } + } + if len(m.Queues) > 0 { + for iNdEx := len(m.Queues) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.Queues[iNdEx]) + copy(dAtA[i:], m.Queues[iNdEx]) + i = encodeVarintEvents(dAtA, i, uint64(len(m.Queues[iNdEx]))) + i-- + dAtA[i] = 0x12 + } + } + if len(m.Name) > 0 { + i -= len(m.Name) + copy(dAtA[i:], m.Name) + i = encodeVarintEvents(dAtA, i, uint64(len(m.Name))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + func encodeVarintEvents(dAtA []byte, offset int, v uint64) int { offset -= sovEvents(v) base := offset @@ -491,6 +786,30 @@ func (m *Event_ExecutorSettingsDelete) Size() (n int) { } return n } +func (m *Event_PreemptOnExecutor) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.PreemptOnExecutor != nil { + l = m.PreemptOnExecutor.Size() + n += 1 + l + sovEvents(uint64(l)) + } + return n +} +func (m *Event_CancelOnExecutor) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.CancelOnExecutor != nil { + l = m.CancelOnExecutor.Size() + n += 1 + l + sovEvents(uint64(l)) + } + return n +} func (m *ExecutorSettingsUpsert) Size() (n int) { if m == nil { return 0 @@ -528,6 +847,56 @@ func (m *ExecutorSettingsDelete) Size() (n int) { return n } +func (m *PreemptOnExecutor) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Name) + if l > 0 { + n += 1 + l + sovEvents(uint64(l)) + } + if len(m.Queues) > 0 { + for _, s := range m.Queues { + l = len(s) + n += 1 + l + sovEvents(uint64(l)) + } + } + if len(m.PriorityClasses) > 0 { + for _, s := range m.PriorityClasses { + l = len(s) + n += 1 + l + sovEvents(uint64(l)) + } + } + return n +} + +func (m *CancelOnExecutor) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Name) + if l > 0 { + n += 1 + l + sovEvents(uint64(l)) + } + if len(m.Queues) > 0 { + for _, s := range m.Queues { + l = len(s) + n += 1 + l + sovEvents(uint64(l)) + } + } + if len(m.PriorityClasses) > 0 { + for _, s := range m.PriorityClasses { + l = len(s) + n += 1 + l + sovEvents(uint64(l)) + } + } + return n +} + func sovEvents(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 } @@ -669,17 +1038,87 @@ func (m *Event) Unmarshal(dAtA []byte) error { } m.Event = &Event_ExecutorSettingsDelete{v} iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skipEvents(dAtA[iNdEx:]) - if err != nil { - return err + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field PreemptOnExecutor", wireType) } - if (skippy < 0) || (iNdEx+skippy) < 0 { - return ErrInvalidLengthEvents + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF + if msglen < 0 { + return ErrInvalidLengthEvents + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthEvents + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + v := &PreemptOnExecutor{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + m.Event = &Event_PreemptOnExecutor{v} + iNdEx = postIndex + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field CancelOnExecutor", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthEvents + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthEvents + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + v := &CancelOnExecutor{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + m.Event = &Event_CancelOnExecutor{v} + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipEvents(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthEvents + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF } iNdEx += skippy } @@ -938,6 +1377,298 @@ func (m *ExecutorSettingsDelete) Unmarshal(dAtA []byte) error { } return nil } +func (m *PreemptOnExecutor) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PreemptOnExecutor: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PreemptOnExecutor: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthEvents + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthEvents + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Name = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Queues", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthEvents + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthEvents + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Queues = append(m.Queues, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field PriorityClasses", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthEvents + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthEvents + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.PriorityClasses = append(m.PriorityClasses, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipEvents(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthEvents + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *CancelOnExecutor) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: CancelOnExecutor: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: CancelOnExecutor: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthEvents + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthEvents + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Name = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Queues", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthEvents + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthEvents + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Queues = append(m.Queues, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field PriorityClasses", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthEvents + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthEvents + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.PriorityClasses = append(m.PriorityClasses, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipEvents(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthEvents + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipEvents(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 diff --git a/pkg/controlplaneevents/events.proto b/pkg/controlplaneevents/events.proto index 7241f56763d..5c5c3e6d0b3 100644 --- a/pkg/controlplaneevents/events.proto +++ b/pkg/controlplaneevents/events.proto @@ -11,6 +11,8 @@ message Event { oneof event { ExecutorSettingsUpsert executorSettingsUpsert = 2; ExecutorSettingsDelete executorSettingsDelete = 3; + PreemptOnExecutor preemptOnExecutor = 4; + CancelOnExecutor cancelOnExecutor = 5; } } @@ -24,3 +26,15 @@ message ExecutorSettingsUpsert { message ExecutorSettingsDelete { string name = 1; } + +message PreemptOnExecutor { + string name = 1; + repeated string queues = 2; + repeated string priorityClasses = 3; +} + +message CancelOnExecutor { + string name = 1; + repeated string queues = 2; + repeated string priorityClasses = 3; +}