Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler: submit checks: add queue fraction limits/floating resources #4074

Merged
merged 1 commit into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions internal/scheduler/internaltypes/resource_list_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ func (factory *ResourceListFactory) MakeAllZero() ResourceList {
return ResourceList{resources: result, factory: factory}
}

func (factory *ResourceListFactory) MakeAllMax() ResourceList {
result := make([]int64, len(factory.indexToName))
for i := range result {
result[i] = math.MaxInt64
}
return ResourceList{resources: result, factory: factory}
}

// Ignore unknown resources, round down.
func (factory *ResourceListFactory) FromNodeProto(resources map[string]k8sResource.Quantity) ResourceList {
result := make([]int64, len(factory.indexToName))
Expand Down
16 changes: 16 additions & 0 deletions internal/scheduler/internaltypes/resource_list_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,22 @@ func TestGetScaleFailsOnUnknown(t *testing.T) {
assert.NotNil(t, err)
}

func TestMakeAllZero(t *testing.T) {
factory := testFactory()
allZero := factory.MakeAllZero()
assert.False(t, allZero.IsEmpty())
assert.True(t, allZero.AllZero())
}

func TestMakeAllMax(t *testing.T) {
factory := testFactory()
allMax := factory.MakeAllMax()
assert.False(t, allMax.IsEmpty())
for _, res := range allMax.GetResources() {
assert.Equal(t, int64(math.MaxInt64), res.RawValue)
}
}

func testFactory() *ResourceListFactory {
factory, _ := NewResourceListFactory(
[]configuration.ResourceType{
Expand Down
2 changes: 2 additions & 0 deletions internal/scheduler/schedulerapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ func Run(config schedulerconfig.Configuration) error {
submitChecker := NewSubmitChecker(
config.Scheduling,
executorRepository,
queueCache,
floatingResourceTypes,
resourceListFactory,
)
services = append(services, func() error {
Expand Down
13 changes: 10 additions & 3 deletions internal/scheduler/scheduling/constraints/constraints.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
type SchedulingConstraints interface {
CheckRoundConstraints(sctx *context.SchedulingContext) (bool, string, error)
CheckJobConstraints(sctx *context.SchedulingContext, gctx *context.GangSchedulingContext) (bool, string, error)
GetQueueResourceLimit(queueName string, priorityClassName string) internaltypes.ResourceList
CapResources(queue string, resourcesByPc map[string]internaltypes.ResourceList) map[string]internaltypes.ResourceList
}

Expand Down Expand Up @@ -139,16 +140,22 @@ func (constraints *schedulingConstraints) CheckJobConstraints(
return false, QueueRateLimitExceededByGangUnschedulableReason, nil
}

// Quantity scheduled by queue and priority class
queueLimit, haslimit := constraints.resourceLimitsPerQueuePerPriorityClass[qctx.Queue][gctx.PriorityClassName]
queueLimit := constraints.GetQueueResourceLimit(qctx.Queue, gctx.PriorityClassName)
allocatedResources := qctx.AllocatedByPriorityClass[gctx.PriorityClassName]
if haslimit && allocatedResources.Exceeds(queueLimit) {
if !queueLimit.IsEmpty() && allocatedResources.Exceeds(queueLimit) {
return false, UnschedulableReasonMaximumResourcesExceeded, nil
}

return true, "", nil
}

func (constraints *schedulingConstraints) GetQueueResourceLimit(
queueName string,
priorityClassName string,
) internaltypes.ResourceList {
return constraints.resourceLimitsPerQueuePerPriorityClass[queueName][priorityClassName]
}

func (c *schedulingConstraints) CapResources(queue string, resourcesByPc map[string]internaltypes.ResourceList) map[string]internaltypes.ResourceList {
perQueueLimit, ok := c.resourceLimitsPerQueuePerPriorityClass[queue]
if !ok {
Expand Down
70 changes: 50 additions & 20 deletions internal/scheduler/scheduling/constraints/constraints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,13 @@ type constraintTest struct {
priorityClassName string
expectedCheckRoundConstraintsReason string
expectedCheckConstraintsReason string
expectedQueueResourceLimit internaltypes.ResourceList
}

func TestConstraints(t *testing.T) {
rlFactory, err := internaltypes.NewResourceListFactory([]configuration.ResourceType{
{Name: "cpu"},
{Name: "memory"},
{Name: "a"},
{Name: "b"},
{Name: "c"},
{Name: "d"},
}, nil)
assert.Nil(t, err)

Expand All @@ -43,15 +40,20 @@ func TestConstraints(t *testing.T) {
NewSchedulingConstraints("pool-1", makeResourceList(rlFactory, "1000", "1000Gi"),
makeSchedulingConfig(),
[]*api.Queue{{Name: "queue-1"}},
), rlFactory),
), rlFactory).withExpectedQueueResourceLimit(rlFactory.MakeAllMax()),
"empty-queue-constraints": makeConstraintsTest(
NewSchedulingConstraints("pool-1", makeResourceList(rlFactory, "1000", "1000Gi"),
makeSchedulingConfig(),
[]*api.Queue{{Name: "queue-1", Cordoned: false, ResourceLimitsByPriorityClassName: map[string]*api.PriorityClassResourceLimits{}}}), rlFactory),
"within-constraints": makeConstraintsTest(NewSchedulingConstraints("pool-1", makeResourceList(rlFactory, "1000", "1000Gi"), configuration.SchedulingConfig{
MaximumResourceFractionToSchedule: map[string]float64{"cpu": 0.1, "memory": 0.1},
PriorityClasses: map[string]types.PriorityClass{"priority-class-1": {MaximumResourceFractionPerQueueByPool: map[string]map[string]float64{"pool-1": {"cpu": 0.9, "memory": 0.9}}}},
}, []*api.Queue{{Name: "queue-1", Cordoned: false, ResourceLimitsByPriorityClassName: map[string]*api.PriorityClassResourceLimits{"priority-class-1": {MaximumResourceFraction: map[string]float64{"cpu": 0.9, "memory": 0.9}}}}}), rlFactory),
[]*api.Queue{{Name: "queue-1", Cordoned: false, ResourceLimitsByPriorityClassName: map[string]*api.PriorityClassResourceLimits{}}}), rlFactory).withExpectedQueueResourceLimit(rlFactory.MakeAllMax()),
"within-constraints": makeConstraintsTest(
NewSchedulingConstraints(
"pool-1", makeResourceList(rlFactory, "1000", "1000Gi"),
configuration.SchedulingConfig{
MaximumResourceFractionToSchedule: map[string]float64{"cpu": 0.1, "memory": 0.1},
PriorityClasses: map[string]types.PriorityClass{"priority-class-1": {MaximumResourceFractionPerQueueByPool: map[string]map[string]float64{"pool-1": {"cpu": 0.9, "memory": 0.9}}}},
},
[]*api.Queue{{Name: "queue-1", Cordoned: false, ResourceLimitsByPriorityClassName: map[string]*api.PriorityClassResourceLimits{"priority-class-1": {MaximumResourceFraction: map[string]float64{"cpu": 0.9, "memory": 0.9}}}}},
), rlFactory).withExpectedQueueResourceLimit(makeResourceList(rlFactory, "900", "900Gi")),
"exceeds-queue-priority-class-constraint": func() *constraintTest {
t := makeConstraintsTest(NewSchedulingConstraints("pool-1", makeResourceList(rlFactory, "1000", "1000Gi"), makeSchedulingConfig(), []*api.Queue{
{
Expand All @@ -65,6 +67,7 @@ func TestConstraints(t *testing.T) {
},
}), rlFactory)
t.expectedCheckConstraintsReason = "resource limit exceeded"
t.expectedQueueResourceLimit = makeResourceList(rlFactory, "1m", "900Gi")
return t
}(),
"exceeds-queue-priority-class-pool-constraint": func() *constraintTest {
Expand All @@ -84,6 +87,7 @@ func TestConstraints(t *testing.T) {
},
}), rlFactory)
t.expectedCheckConstraintsReason = "resource limit exceeded"
t.expectedQueueResourceLimit = makeResourceList(rlFactory, "1m", "900Gi")
return t
}(),
"exceeds-priority-class-constraint": func() *constraintTest {
Expand All @@ -92,41 +96,41 @@ func TestConstraints(t *testing.T) {
PriorityClasses: map[string]types.PriorityClass{"priority-class-1": {MaximumResourceFractionPerQueueByPool: map[string]map[string]float64{"pool-1": {"cpu": 0.00000001, "memory": 0.9}}}},
}, []*api.Queue{{Name: "queue-1"}}), rlFactory)
t.expectedCheckConstraintsReason = "resource limit exceeded"
t.expectedQueueResourceLimit = makeResourceList(rlFactory, "10n", "900Gi")
return t
}(),
"priority-class-constraint-ignored-if-there-is-a-queue-constraint": makeConstraintsTest(NewSchedulingConstraints("pool-1", makeResourceList(rlFactory, "1000", "1000Gi"), configuration.SchedulingConfig{
MaximumResourceFractionToSchedule: map[string]float64{"cpu": 0.1, "memory": 0.1},
PriorityClasses: map[string]types.PriorityClass{"priority-class-1": {MaximumResourceFractionPerQueueByPool: map[string]map[string]float64{"pool-1": {"cpu": 0.00000001, "memory": 0.9}}}},
}, []*api.Queue{{Name: "queue-1", ResourceLimitsByPriorityClassName: map[string]*api.PriorityClassResourceLimits{"priority-class-1": {MaximumResourceFraction: map[string]float64{"cpu": 0.9, "memory": 0.9}}}}}), rlFactory),
"priority-class-constraint-ignored-if-there-is-a-queue-constraint": makeConstraintsTest(
NewSchedulingConstraints("pool-1", makeResourceList(rlFactory, "1000", "1000Gi"), configuration.SchedulingConfig{
MaximumResourceFractionToSchedule: map[string]float64{"cpu": 0.1, "memory": 0.1},
PriorityClasses: map[string]types.PriorityClass{"priority-class-1": {MaximumResourceFractionPerQueueByPool: map[string]map[string]float64{"pool-1": {"cpu": 0.00000001, "memory": 0.9}}}},
},
[]*api.Queue{{Name: "queue-1", ResourceLimitsByPriorityClassName: map[string]*api.PriorityClassResourceLimits{"priority-class-1": {MaximumResourceFraction: map[string]float64{"cpu": 0.9, "memory": 0.9}}}}}),
rlFactory,
).withExpectedQueueResourceLimit(makeResourceList(rlFactory, "900", "900Gi")),
"one-constraint-per-level-falls-back-as-expected--within-limits": makeMultiLevelConstraintsTest(
map[string]resource.Quantity{"a": resource.MustParse("99"), "b": resource.MustParse("19"), "c": resource.MustParse("2.9"), "d": resource.MustParse("0.39")},
"",
"",
rlFactory,
),
"one-constraint-per-level-falls-back-as-expected--a-exceeds-limits": makeMultiLevelConstraintsTest(
map[string]resource.Quantity{"a": resource.MustParse("101"), "b": resource.MustParse("19"), "c": resource.MustParse("2.9"), "d": resource.MustParse("0.39")},
UnschedulableReasonMaximumResourcesExceeded,
"",
rlFactory,
),
"one-constraint-per-level-falls-back-as-expected--b-exceeds-limits": makeMultiLevelConstraintsTest(
map[string]resource.Quantity{"a": resource.MustParse("99"), "b": resource.MustParse("21"), "c": resource.MustParse("2.9"), "d": resource.MustParse("0.39")},
UnschedulableReasonMaximumResourcesExceeded,
"",
rlFactory,
),
"one-constraint-per-level-falls-back-as-expected--c-exceeds-limits": makeMultiLevelConstraintsTest(
map[string]resource.Quantity{"a": resource.MustParse("99"), "b": resource.MustParse("19"), "c": resource.MustParse("3.1"), "d": resource.MustParse("0.39")},
UnschedulableReasonMaximumResourcesExceeded,
"",
rlFactory,
),
"one-constraint-per-level-falls-back-as-expected--d-exceeds-limits": makeMultiLevelConstraintsTest(
map[string]resource.Quantity{"a": resource.MustParse("99"), "b": resource.MustParse("19"), "c": resource.MustParse("2.9"), "d": resource.MustParse("0.41")},
UnschedulableReasonMaximumResourcesExceeded,
"",
rlFactory,
),
}
for name, tc := range tests {
Expand All @@ -140,6 +144,8 @@ func TestConstraints(t *testing.T) {
require.NoError(t, err)
require.Equal(t, tc.expectedCheckConstraintsReason == "", ok)
require.Equal(t, tc.expectedCheckConstraintsReason, unscheduledReason)

require.Equal(t, tc.expectedQueueResourceLimit.String(), tc.constraints.GetQueueResourceLimit(tc.gctx.Queue, tc.gctx.PriorityClassName).String())
})
}
}
Expand Down Expand Up @@ -259,7 +265,18 @@ func TestCapResources(t *testing.T) {
}
}

func makeMultiLevelConstraintsTest(requirements map[string]resource.Quantity, expectedCheckConstraintsReason string, expectedCheckRoundConstraintsReason string, rlFactory *internaltypes.ResourceListFactory) *constraintTest {
func makeMultiLevelConstraintsTest(
requirements map[string]resource.Quantity,
expectedCheckConstraintsReason string,
expectedCheckRoundConstraintsReason string,
) *constraintTest {
rlFactory, _ := internaltypes.NewResourceListFactory([]configuration.ResourceType{
{Name: "a"},
{Name: "b"},
{Name: "c"},
{Name: "d"},
}, nil)

rr := rlFactory.FromJobResourceListIgnoreUnknown(requirements)
return &constraintTest{
constraints: makeMultiLevelConstraints(rlFactory),
Expand Down Expand Up @@ -293,6 +310,13 @@ func makeMultiLevelConstraintsTest(requirements map[string]resource.Quantity, ex
priorityClassName: "priority-class-1",
expectedCheckConstraintsReason: expectedCheckConstraintsReason,
expectedCheckRoundConstraintsReason: expectedCheckRoundConstraintsReason,
expectedQueueResourceLimit: rlFactory.FromJobResourceListIgnoreUnknown(
map[string]resource.Quantity{
"a": resource.MustParse("100"),
"b": resource.MustParse("20"),
"c": resource.MustParse("3"),
"d": resource.MustParse("0.4"),
}),
}
}

Expand Down Expand Up @@ -370,9 +394,15 @@ func makeConstraintsTest(constraints SchedulingConstraints, rlFactory *internalt
priorityClassName: "priority-class-1",
expectedCheckConstraintsReason: "",
expectedCheckRoundConstraintsReason: "",
expectedQueueResourceLimit: internaltypes.ResourceList{},
}
}

func (ct *constraintTest) withExpectedQueueResourceLimit(limit internaltypes.ResourceList) *constraintTest {
ct.expectedQueueResourceLimit = limit
return ct
}

func makeSchedulingConfig() configuration.SchedulingConfig {
return configuration.SchedulingConfig{
MaximumResourceFractionToSchedule: map[string]float64{"cpu": 0.1, "memory": 0.1},
Expand Down
Loading
Loading