Skip to content

Commit

Permalink
Cordon queue (#183) (#3851)
Browse files Browse the repository at this point in the history
* Dropping queues table

* Plumbing in cordoning mechanism

* Adding tests

* Generating proto with the correct version

* Removing method name from queue_service error messages

* Moving rate-limit comment

* Removing swagger diffs

Co-authored-by: Mustafa Ilyas <[email protected]>
  • Loading branch information
MustafaI and mustafai-gr authored Aug 1, 2024
1 parent 688302c commit e41e814
Show file tree
Hide file tree
Showing 23 changed files with 1,256 additions and 429 deletions.
28 changes: 28 additions & 0 deletions internal/armada/mocks/mock_repository.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions internal/armada/permissions/permissions.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ const (
WatchAllEvents = "watch_all_events"
CreateQueue = "create_queue"
DeleteQueue = "delete_queue"
CordonQueue = "cordon_queue"
CordonNodes = "cordon_nodes"
)
20 changes: 20 additions & 0 deletions internal/armada/queue/queue_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type QueueRepository interface {
CreateQueue(*armadacontext.Context, queue.Queue) error
UpdateQueue(*armadacontext.Context, queue.Queue) error
DeleteQueue(ctx *armadacontext.Context, name string) error
CordonQueue(ctx *armadacontext.Context, name string) error
UncordonQueue(ctx *armadacontext.Context, name string) error
}

type ReadOnlyQueueRepository interface {
Expand Down Expand Up @@ -116,6 +118,24 @@ func (r *PostgresQueueRepository) DeleteQueue(ctx *armadacontext.Context, name s
return nil
}

func (r *PostgresQueueRepository) CordonQueue(ctx *armadacontext.Context, name string) error {
queueToCordon, err := r.GetQueue(ctx, name)
if err != nil {
return err
}
queueToCordon.Cordoned = true
return r.upsertQueue(ctx, queueToCordon)
}

func (r *PostgresQueueRepository) UncordonQueue(ctx *armadacontext.Context, name string) error {
queueToUncordon, err := r.GetQueue(ctx, name)
if err != nil {
return err
}
queueToUncordon.Cordoned = false
return r.upsertQueue(ctx, queueToUncordon)
}

func (r *PostgresQueueRepository) upsertQueue(ctx *armadacontext.Context, queue queue.Queue) error {
data, err := proto.Marshal(queue.ToAPI())
if err != nil {
Expand Down
69 changes: 54 additions & 15 deletions internal/armada/queue/queue_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package queue

import (
"context"
"fmt"
"math"

"github.com/gogo/protobuf/types"
Expand Down Expand Up @@ -37,9 +38,9 @@ func (s *Server) CreateQueue(grpcCtx context.Context, req *api.Queue) (*types.Em
err := s.authorizer.AuthorizeAction(ctx, permissions.CreateQueue)
var ep *armadaerrors.ErrUnauthorized
if errors.As(err, &ep) {
return nil, status.Errorf(codes.PermissionDenied, "[CreateQueue] error creating queue %s: %s", req.Name, ep)
return nil, status.Errorf(codes.PermissionDenied, "error creating queue %s: %s", req.Name, ep)
} else if err != nil {
return nil, status.Errorf(codes.Unavailable, "[CreateQueue] error checking permissions: %s", err)
return nil, status.Errorf(codes.Unavailable, "error checking permissions: %s", err)
}

if len(req.UserOwners) == 0 {
Expand All @@ -49,15 +50,15 @@ func (s *Server) CreateQueue(grpcCtx context.Context, req *api.Queue) (*types.Em

queue, err := queue.NewQueue(req)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "[CreateQueue] error validating queue: %s", err)
return nil, status.Errorf(codes.InvalidArgument, "error validating queue: %s", err)
}

err = s.queueRepository.CreateQueue(ctx, queue)
var eq *ErrQueueAlreadyExists
if errors.As(err, &eq) {
return nil, status.Errorf(codes.AlreadyExists, "[CreateQueue] error creating queue: %s", err)
return nil, status.Errorf(codes.AlreadyExists, "error creating queue: %s", err)
} else if err != nil {
return nil, status.Errorf(codes.Unavailable, "[CreateQueue] error creating queue: %s", err)
return nil, status.Errorf(codes.Unavailable, "error creating queue: %s", err)
}

return &types.Empty{}, nil
Expand Down Expand Up @@ -87,22 +88,22 @@ func (s *Server) UpdateQueue(grpcCtx context.Context, req *api.Queue) (*types.Em
err := s.authorizer.AuthorizeAction(ctx, permissions.CreateQueue)
var ep *armadaerrors.ErrUnauthorized
if errors.As(err, &ep) {
return nil, status.Errorf(codes.PermissionDenied, "[UpdateQueue] error updating queue %s: %s", req.Name, ep)
return nil, status.Errorf(codes.PermissionDenied, "error updating queue %s: %s", req.Name, ep)
} else if err != nil {
return nil, status.Errorf(codes.Unavailable, "[UpdateQueue] error checking permissions: %s", err)
return nil, status.Errorf(codes.Unavailable, "error checking permissions: %s", err)
}

queue, err := queue.NewQueue(req)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "[UpdateQueue] error: %s", err)
return nil, status.Errorf(codes.InvalidArgument, "error: %s", err)
}

err = s.queueRepository.UpdateQueue(ctx, queue)
var e *ErrQueueNotFound
if errors.As(err, &e) {
return nil, status.Errorf(codes.NotFound, "[UpdateQueue] error: %s", err)
return nil, status.Errorf(codes.NotFound, "error: %s", err)
} else if err != nil {
return nil, status.Errorf(codes.Unavailable, "[UpdateQueue] error getting queue %q: %s", queue.Name, err)
return nil, status.Errorf(codes.Unavailable, "error getting queue %q: %s", queue.Name, err)
}

return &types.Empty{}, nil
Expand Down Expand Up @@ -133,13 +134,13 @@ func (s *Server) DeleteQueue(grpcCtx context.Context, req *api.QueueDeleteReques
err := s.authorizer.AuthorizeAction(ctx, permissions.DeleteQueue)
var ep *armadaerrors.ErrUnauthorized
if errors.As(err, &ep) {
return nil, status.Errorf(codes.PermissionDenied, "[DeleteQueue] error deleting queue %s: %s", req.Name, ep)
return nil, status.Errorf(codes.PermissionDenied, "error deleting queue %s: %s", req.Name, ep)
} else if err != nil {
return nil, status.Errorf(codes.Unavailable, "[DeleteQueue] error checking permissions: %s", err)
return nil, status.Errorf(codes.Unavailable, "error checking permissions: %s", err)
}
err = s.queueRepository.DeleteQueue(ctx, req.Name)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "[DeleteQueue] error deleting queue %s: %s", req.Name, err)
return nil, status.Errorf(codes.InvalidArgument, "error deleting queue %s: %s", req.Name, err)
}
return &types.Empty{}, nil
}
Expand All @@ -149,9 +150,9 @@ func (s *Server) GetQueue(grpcCtx context.Context, req *api.QueueGetRequest) (*a
queue, err := s.queueRepository.GetQueue(ctx, req.Name)
var e *ErrQueueNotFound
if errors.As(err, &e) {
return nil, status.Errorf(codes.NotFound, "[GetQueue] error: %s", err)
return nil, status.Errorf(codes.NotFound, "error: %s", err)
} else if err != nil {
return nil, status.Errorf(codes.Unavailable, "[GetQueue] error getting queue %q: %s", req.Name, err)
return nil, status.Errorf(codes.Unavailable, "error getting queue %q: %s", req.Name, err)
}
return queue.ToAPI(), nil
}
Expand Down Expand Up @@ -189,3 +190,41 @@ func (s *Server) GetQueues(req *api.StreamingQueueGetRequest, stream api.QueueSe
}
return nil
}

func (s *Server) CordonQueue(grpcCtx context.Context, req *api.QueueCordonRequest) (*types.Empty, error) {
ctx := armadacontext.FromGrpcCtx(grpcCtx)

err := s.authorizer.AuthorizeAction(ctx, permissions.CordonQueue)
var ep *armadaerrors.ErrUnauthorized
if errors.As(err, &ep) {
return nil, status.Errorf(codes.PermissionDenied, "error cordoning queue %s: %s", req.Name, ep)
} else if err != nil {
return nil, status.Errorf(codes.Unavailable, "error checking permissions: %s", err)
}

queueName := req.Name
if queueName == "" {
return nil, fmt.Errorf("cannot cordon queue with empty name")
}

return &types.Empty{}, s.queueRepository.CordonQueue(ctx, queueName)
}

func (s *Server) UncordonQueue(grpcCtx context.Context, req *api.QueueUncordonRequest) (*types.Empty, error) {
ctx := armadacontext.FromGrpcCtx(grpcCtx)

err := s.authorizer.AuthorizeAction(ctx, permissions.CordonQueue)
var ep *armadaerrors.ErrUnauthorized
if errors.As(err, &ep) {
return nil, status.Errorf(codes.PermissionDenied, "error uncordoning queue %s: %s", req.Name, ep)
} else if err != nil {
return nil, status.Errorf(codes.Unavailable, "error checking permissions: %s", err)
}

queueName := req.Name
if queueName == "" {
return nil, fmt.Errorf("cannot uncordon queue with empty name")
}

return &types.Empty{}, s.queueRepository.UncordonQueue(ctx, queueName)
}
11 changes: 7 additions & 4 deletions internal/armadactl/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,13 @@ type Params struct {
// However, they are user-replaceable to facilitate testing.
// TODO Consider replacing with an interface
type QueueAPI struct {
Create queue.CreateAPI
Delete queue.DeleteAPI
Get queue.GetAPI
Update queue.UpdateAPI
Create queue.CreateAPI
Delete queue.DeleteAPI
Get queue.GetAPI
GetAll queue.GetAllAPI
Update queue.UpdateAPI
Cordon queue.CordonAPI
Uncordon queue.UncordonAPI
}

// New instantiates an App with default parameters, including standard output
Expand Down
22 changes: 11 additions & 11 deletions internal/scheduler/constraints/constraints.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
// Indicates that the scheduling rate limit has been exceeded.
GlobalRateLimitExceededUnschedulableReason = "global scheduling rate limit exceeded"
QueueRateLimitExceededUnschedulableReason = "queue scheduling rate limit exceeded"
SchedulingPausedOnQueueUnschedulableReason = "scheduling paused on queue"

// Indicates that scheduling a gang would exceed the rate limit.
GlobalRateLimitExceededByGangUnschedulableReason = "gang would exceed global scheduling rate limit"
Expand Down Expand Up @@ -54,7 +55,7 @@ func IsTerminalUnschedulableReason(reason string) bool {
// IsTerminalQueueUnschedulableReason returns true if reason indicates
// it's not possible to schedule any more jobs from this queue in this round.
func IsTerminalQueueUnschedulableReason(reason string) bool {
return reason == QueueRateLimitExceededUnschedulableReason
return reason == QueueRateLimitExceededUnschedulableReason || reason == SchedulingPausedOnQueueUnschedulableReason
}

// SchedulingConstraints contains scheduling constraints, e.g., per-queue resource limits.
Expand All @@ -75,6 +76,8 @@ type SchedulingConstraints struct {
type queueSchedulingConstraints struct {
// Scheduling constraints by priority class.
PriorityClassSchedulingConstraintsByPriorityClassName map[string]priorityClassSchedulingConstraints
// Determines whether scheduling has been paused for this queue
Cordoned bool
}

// priorityClassSchedulingConstraints contains scheduling constraints that apply to jobs of a specific priority class.
Expand All @@ -84,12 +87,7 @@ type priorityClassSchedulingConstraints struct {
MaximumResourcesPerQueue map[string]resource.Quantity
}

func NewSchedulingConstraints(
pool string,
totalResources schedulerobjects.ResourceList,
config configuration.SchedulingConfig,
queues []*api.Queue,
) SchedulingConstraints {
func NewSchedulingConstraints(pool string, totalResources schedulerobjects.ResourceList, config configuration.SchedulingConfig, queues []*api.Queue, cordonStatusByQueue map[string]bool) SchedulingConstraints {
priorityClassSchedulingConstraintsByPriorityClassName := make(map[string]priorityClassSchedulingConstraints, len(config.PriorityClasses))
for name, priorityClass := range config.PriorityClasses {
maximumResourceFractionPerQueue := priorityClass.MaximumResourceFractionPerQueue
Expand Down Expand Up @@ -117,10 +115,9 @@ func NewSchedulingConstraints(
MaximumResourcesPerQueue: absoluteFromRelativeLimits(totalResources.Resources, maximumResourceFraction),
}
}
if len(priorityClassSchedulingConstraintsByPriorityClassNameForQueue) > 0 {
queueSchedulingConstraintsByQueueName[queue.Name] = queueSchedulingConstraints{
PriorityClassSchedulingConstraintsByPriorityClassName: priorityClassSchedulingConstraintsByPriorityClassNameForQueue,
}
queueSchedulingConstraintsByQueueName[queue.Name] = queueSchedulingConstraints{
PriorityClassSchedulingConstraintsByPriorityClassName: priorityClassSchedulingConstraintsByPriorityClassNameForQueue,
Cordoned: cordonStatusByQueue[queue.Name],
}
}

Expand Down Expand Up @@ -182,6 +179,9 @@ func (constraints *SchedulingConstraints) CheckConstraints(
return false, GlobalRateLimitExceededByGangUnschedulableReason, nil
}

if queueConstraints, ok := constraints.queueSchedulingConstraintsByQueueName[qctx.Queue]; ok && queueConstraints.Cordoned {
return false, SchedulingPausedOnQueueUnschedulableReason, nil
}
// Per-queue rate limiter check.
tokens = qctx.Limiter.TokensAt(sctx.Started)
if tokens <= 0 {
Expand Down
Loading

0 comments on commit e41e814

Please sign in to comment.