Skip to content

Commit

Permalink
Scheduler: submit checks: add queue fraction limits/floating resources (
Browse files Browse the repository at this point in the history
#4074)

Signed-off-by: Robert Smith <[email protected]>
  • Loading branch information
robertdavidsmith authored Dec 4, 2024
1 parent 7a76f24 commit a415731
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 34 deletions.
8 changes: 8 additions & 0 deletions internal/scheduler/internaltypes/resource_list_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ func (factory *ResourceListFactory) MakeAllZero() ResourceList {
return ResourceList{resources: result, factory: factory}
}

func (factory *ResourceListFactory) MakeAllMax() ResourceList {
result := make([]int64, len(factory.indexToName))
for i := range result {
result[i] = math.MaxInt64
}
return ResourceList{resources: result, factory: factory}
}

// Ignore unknown resources, round down.
func (factory *ResourceListFactory) FromNodeProto(resources map[string]k8sResource.Quantity) ResourceList {
result := make([]int64, len(factory.indexToName))
Expand Down
16 changes: 16 additions & 0 deletions internal/scheduler/internaltypes/resource_list_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,22 @@ func TestGetScaleFailsOnUnknown(t *testing.T) {
assert.NotNil(t, err)
}

func TestMakeAllZero(t *testing.T) {
factory := testFactory()
allZero := factory.MakeAllZero()
assert.False(t, allZero.IsEmpty())
assert.True(t, allZero.AllZero())
}

func TestMakeAllMax(t *testing.T) {
factory := testFactory()
allMax := factory.MakeAllMax()
assert.False(t, allMax.IsEmpty())
for _, res := range allMax.GetResources() {
assert.Equal(t, int64(math.MaxInt64), res.RawValue)
}
}

func testFactory() *ResourceListFactory {
factory, _ := NewResourceListFactory(
[]configuration.ResourceType{
Expand Down
2 changes: 2 additions & 0 deletions internal/scheduler/schedulerapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ func Run(config schedulerconfig.Configuration) error {
submitChecker := NewSubmitChecker(
config.Scheduling,
executorRepository,
queueCache,
floatingResourceTypes,
resourceListFactory,
)
services = append(services, func() error {
Expand Down
13 changes: 10 additions & 3 deletions internal/scheduler/scheduling/constraints/constraints.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
type SchedulingConstraints interface {
CheckRoundConstraints(sctx *context.SchedulingContext) (bool, string, error)
CheckJobConstraints(sctx *context.SchedulingContext, gctx *context.GangSchedulingContext) (bool, string, error)
GetQueueResourceLimit(queueName string, priorityClassName string) internaltypes.ResourceList
CapResources(queue string, resourcesByPc map[string]internaltypes.ResourceList) map[string]internaltypes.ResourceList
}

Expand Down Expand Up @@ -139,16 +140,22 @@ func (constraints *schedulingConstraints) CheckJobConstraints(
return false, QueueRateLimitExceededByGangUnschedulableReason, nil
}

// Quantity scheduled by queue and priority class
queueLimit, haslimit := constraints.resourceLimitsPerQueuePerPriorityClass[qctx.Queue][gctx.PriorityClassName]
queueLimit := constraints.GetQueueResourceLimit(qctx.Queue, gctx.PriorityClassName)
allocatedResources := qctx.AllocatedByPriorityClass[gctx.PriorityClassName]
if haslimit && allocatedResources.Exceeds(queueLimit) {
if !queueLimit.IsEmpty() && allocatedResources.Exceeds(queueLimit) {
return false, UnschedulableReasonMaximumResourcesExceeded, nil
}

return true, "", nil
}

func (constraints *schedulingConstraints) GetQueueResourceLimit(
queueName string,
priorityClassName string,
) internaltypes.ResourceList {
return constraints.resourceLimitsPerQueuePerPriorityClass[queueName][priorityClassName]
}

func (c *schedulingConstraints) CapResources(queue string, resourcesByPc map[string]internaltypes.ResourceList) map[string]internaltypes.ResourceList {
perQueueLimit, ok := c.resourceLimitsPerQueuePerPriorityClass[queue]
if !ok {
Expand Down
70 changes: 50 additions & 20 deletions internal/scheduler/scheduling/constraints/constraints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,13 @@ type constraintTest struct {
priorityClassName string
expectedCheckRoundConstraintsReason string
expectedCheckConstraintsReason string
expectedQueueResourceLimit internaltypes.ResourceList
}

func TestConstraints(t *testing.T) {
rlFactory, err := internaltypes.NewResourceListFactory([]configuration.ResourceType{
{Name: "cpu"},
{Name: "memory"},
{Name: "a"},
{Name: "b"},
{Name: "c"},
{Name: "d"},
}, nil)
assert.Nil(t, err)

Expand All @@ -43,15 +40,20 @@ func TestConstraints(t *testing.T) {
NewSchedulingConstraints("pool-1", makeResourceList(rlFactory, "1000", "1000Gi"),
makeSchedulingConfig(),
[]*api.Queue{{Name: "queue-1"}},
), rlFactory),
), rlFactory).withExpectedQueueResourceLimit(rlFactory.MakeAllMax()),
"empty-queue-constraints": makeConstraintsTest(
NewSchedulingConstraints("pool-1", makeResourceList(rlFactory, "1000", "1000Gi"),
makeSchedulingConfig(),
[]*api.Queue{{Name: "queue-1", Cordoned: false, ResourceLimitsByPriorityClassName: map[string]*api.PriorityClassResourceLimits{}}}), rlFactory),
"within-constraints": makeConstraintsTest(NewSchedulingConstraints("pool-1", makeResourceList(rlFactory, "1000", "1000Gi"), configuration.SchedulingConfig{
MaximumResourceFractionToSchedule: map[string]float64{"cpu": 0.1, "memory": 0.1},
PriorityClasses: map[string]types.PriorityClass{"priority-class-1": {MaximumResourceFractionPerQueueByPool: map[string]map[string]float64{"pool-1": {"cpu": 0.9, "memory": 0.9}}}},
}, []*api.Queue{{Name: "queue-1", Cordoned: false, ResourceLimitsByPriorityClassName: map[string]*api.PriorityClassResourceLimits{"priority-class-1": {MaximumResourceFraction: map[string]float64{"cpu": 0.9, "memory": 0.9}}}}}), rlFactory),
[]*api.Queue{{Name: "queue-1", Cordoned: false, ResourceLimitsByPriorityClassName: map[string]*api.PriorityClassResourceLimits{}}}), rlFactory).withExpectedQueueResourceLimit(rlFactory.MakeAllMax()),
"within-constraints": makeConstraintsTest(
NewSchedulingConstraints(
"pool-1", makeResourceList(rlFactory, "1000", "1000Gi"),
configuration.SchedulingConfig{
MaximumResourceFractionToSchedule: map[string]float64{"cpu": 0.1, "memory": 0.1},
PriorityClasses: map[string]types.PriorityClass{"priority-class-1": {MaximumResourceFractionPerQueueByPool: map[string]map[string]float64{"pool-1": {"cpu": 0.9, "memory": 0.9}}}},
},
[]*api.Queue{{Name: "queue-1", Cordoned: false, ResourceLimitsByPriorityClassName: map[string]*api.PriorityClassResourceLimits{"priority-class-1": {MaximumResourceFraction: map[string]float64{"cpu": 0.9, "memory": 0.9}}}}},
), rlFactory).withExpectedQueueResourceLimit(makeResourceList(rlFactory, "900", "900Gi")),
"exceeds-queue-priority-class-constraint": func() *constraintTest {
t := makeConstraintsTest(NewSchedulingConstraints("pool-1", makeResourceList(rlFactory, "1000", "1000Gi"), makeSchedulingConfig(), []*api.Queue{
{
Expand All @@ -65,6 +67,7 @@ func TestConstraints(t *testing.T) {
},
}), rlFactory)
t.expectedCheckConstraintsReason = "resource limit exceeded"
t.expectedQueueResourceLimit = makeResourceList(rlFactory, "1m", "900Gi")
return t
}(),
"exceeds-queue-priority-class-pool-constraint": func() *constraintTest {
Expand All @@ -84,6 +87,7 @@ func TestConstraints(t *testing.T) {
},
}), rlFactory)
t.expectedCheckConstraintsReason = "resource limit exceeded"
t.expectedQueueResourceLimit = makeResourceList(rlFactory, "1m", "900Gi")
return t
}(),
"exceeds-priority-class-constraint": func() *constraintTest {
Expand All @@ -92,41 +96,41 @@ func TestConstraints(t *testing.T) {
PriorityClasses: map[string]types.PriorityClass{"priority-class-1": {MaximumResourceFractionPerQueueByPool: map[string]map[string]float64{"pool-1": {"cpu": 0.00000001, "memory": 0.9}}}},
}, []*api.Queue{{Name: "queue-1"}}), rlFactory)
t.expectedCheckConstraintsReason = "resource limit exceeded"
t.expectedQueueResourceLimit = makeResourceList(rlFactory, "10n", "900Gi")
return t
}(),
"priority-class-constraint-ignored-if-there-is-a-queue-constraint": makeConstraintsTest(NewSchedulingConstraints("pool-1", makeResourceList(rlFactory, "1000", "1000Gi"), configuration.SchedulingConfig{
MaximumResourceFractionToSchedule: map[string]float64{"cpu": 0.1, "memory": 0.1},
PriorityClasses: map[string]types.PriorityClass{"priority-class-1": {MaximumResourceFractionPerQueueByPool: map[string]map[string]float64{"pool-1": {"cpu": 0.00000001, "memory": 0.9}}}},
}, []*api.Queue{{Name: "queue-1", ResourceLimitsByPriorityClassName: map[string]*api.PriorityClassResourceLimits{"priority-class-1": {MaximumResourceFraction: map[string]float64{"cpu": 0.9, "memory": 0.9}}}}}), rlFactory),
"priority-class-constraint-ignored-if-there-is-a-queue-constraint": makeConstraintsTest(
NewSchedulingConstraints("pool-1", makeResourceList(rlFactory, "1000", "1000Gi"), configuration.SchedulingConfig{
MaximumResourceFractionToSchedule: map[string]float64{"cpu": 0.1, "memory": 0.1},
PriorityClasses: map[string]types.PriorityClass{"priority-class-1": {MaximumResourceFractionPerQueueByPool: map[string]map[string]float64{"pool-1": {"cpu": 0.00000001, "memory": 0.9}}}},
},
[]*api.Queue{{Name: "queue-1", ResourceLimitsByPriorityClassName: map[string]*api.PriorityClassResourceLimits{"priority-class-1": {MaximumResourceFraction: map[string]float64{"cpu": 0.9, "memory": 0.9}}}}}),
rlFactory,
).withExpectedQueueResourceLimit(makeResourceList(rlFactory, "900", "900Gi")),
"one-constraint-per-level-falls-back-as-expected--within-limits": makeMultiLevelConstraintsTest(
map[string]resource.Quantity{"a": resource.MustParse("99"), "b": resource.MustParse("19"), "c": resource.MustParse("2.9"), "d": resource.MustParse("0.39")},
"",
"",
rlFactory,
),
"one-constraint-per-level-falls-back-as-expected--a-exceeds-limits": makeMultiLevelConstraintsTest(
map[string]resource.Quantity{"a": resource.MustParse("101"), "b": resource.MustParse("19"), "c": resource.MustParse("2.9"), "d": resource.MustParse("0.39")},
UnschedulableReasonMaximumResourcesExceeded,
"",
rlFactory,
),
"one-constraint-per-level-falls-back-as-expected--b-exceeds-limits": makeMultiLevelConstraintsTest(
map[string]resource.Quantity{"a": resource.MustParse("99"), "b": resource.MustParse("21"), "c": resource.MustParse("2.9"), "d": resource.MustParse("0.39")},
UnschedulableReasonMaximumResourcesExceeded,
"",
rlFactory,
),
"one-constraint-per-level-falls-back-as-expected--c-exceeds-limits": makeMultiLevelConstraintsTest(
map[string]resource.Quantity{"a": resource.MustParse("99"), "b": resource.MustParse("19"), "c": resource.MustParse("3.1"), "d": resource.MustParse("0.39")},
UnschedulableReasonMaximumResourcesExceeded,
"",
rlFactory,
),
"one-constraint-per-level-falls-back-as-expected--d-exceeds-limits": makeMultiLevelConstraintsTest(
map[string]resource.Quantity{"a": resource.MustParse("99"), "b": resource.MustParse("19"), "c": resource.MustParse("2.9"), "d": resource.MustParse("0.41")},
UnschedulableReasonMaximumResourcesExceeded,
"",
rlFactory,
),
}
for name, tc := range tests {
Expand All @@ -140,6 +144,8 @@ func TestConstraints(t *testing.T) {
require.NoError(t, err)
require.Equal(t, tc.expectedCheckConstraintsReason == "", ok)
require.Equal(t, tc.expectedCheckConstraintsReason, unscheduledReason)

require.Equal(t, tc.expectedQueueResourceLimit.String(), tc.constraints.GetQueueResourceLimit(tc.gctx.Queue, tc.gctx.PriorityClassName).String())
})
}
}
Expand Down Expand Up @@ -259,7 +265,18 @@ func TestCapResources(t *testing.T) {
}
}

func makeMultiLevelConstraintsTest(requirements map[string]resource.Quantity, expectedCheckConstraintsReason string, expectedCheckRoundConstraintsReason string, rlFactory *internaltypes.ResourceListFactory) *constraintTest {
func makeMultiLevelConstraintsTest(
requirements map[string]resource.Quantity,
expectedCheckConstraintsReason string,
expectedCheckRoundConstraintsReason string,
) *constraintTest {
rlFactory, _ := internaltypes.NewResourceListFactory([]configuration.ResourceType{
{Name: "a"},
{Name: "b"},
{Name: "c"},
{Name: "d"},
}, nil)

rr := rlFactory.FromJobResourceListIgnoreUnknown(requirements)
return &constraintTest{
constraints: makeMultiLevelConstraints(rlFactory),
Expand Down Expand Up @@ -293,6 +310,13 @@ func makeMultiLevelConstraintsTest(requirements map[string]resource.Quantity, ex
priorityClassName: "priority-class-1",
expectedCheckConstraintsReason: expectedCheckConstraintsReason,
expectedCheckRoundConstraintsReason: expectedCheckRoundConstraintsReason,
expectedQueueResourceLimit: rlFactory.FromJobResourceListIgnoreUnknown(
map[string]resource.Quantity{
"a": resource.MustParse("100"),
"b": resource.MustParse("20"),
"c": resource.MustParse("3"),
"d": resource.MustParse("0.4"),
}),
}
}

Expand Down Expand Up @@ -370,9 +394,15 @@ func makeConstraintsTest(constraints SchedulingConstraints, rlFactory *internalt
priorityClassName: "priority-class-1",
expectedCheckConstraintsReason: "",
expectedCheckRoundConstraintsReason: "",
expectedQueueResourceLimit: internaltypes.ResourceList{},
}
}

func (ct *constraintTest) withExpectedQueueResourceLimit(limit internaltypes.ResourceList) *constraintTest {
ct.expectedQueueResourceLimit = limit
return ct
}

func makeSchedulingConfig() configuration.SchedulingConfig {
return configuration.SchedulingConfig{
MaximumResourceFractionToSchedule: map[string]float64{"cpu": 0.1, "memory": 0.1},
Expand Down
Loading

0 comments on commit a415731

Please sign in to comment.