diff --git a/pkg/cache/clusterqueue_snapshot.go b/pkg/cache/clusterqueue_snapshot.go index d039435974c..4c715da9bc2 100644 --- a/pkg/cache/clusterqueue_snapshot.go +++ b/pkg/cache/clusterqueue_snapshot.go @@ -87,6 +87,20 @@ func (c *ClusterQueueSnapshot) Available(fr resources.FlavorResource) int64 { return max(0, capacityAvailable) } +func (c *ClusterQueueSnapshot) PotentialAvailable(fr resources.FlavorResource) int64 { + if c.Cohort == nil { + return c.nominal(fr) + } + potential := c.RequestableCohortQuota(fr) + + // if the borrowing limit exists, we cap our potential capacity by the borrowing limit. + if borrowingLimit := c.borrowingLimit(fr); borrowingLimit != nil { + withBorrowingRemaining := c.nominal(fr) + *borrowingLimit + potential = min(potential, withBorrowingRemaining) + } + return max(0, potential) +} + func (c *ClusterQueueSnapshot) nominal(fr resources.FlavorResource) int64 { if quota := c.QuotaFor(fr); quota != nil { return quota.Nominal diff --git a/pkg/resources/requests.go b/pkg/resources/requests.go index 12d1b747b96..1d58e9678e2 100644 --- a/pkg/resources/requests.go +++ b/pkg/resources/requests.go @@ -67,3 +67,7 @@ func ResourceQuantity(name corev1.ResourceName, v int64) resource.Quantity { return *resource.NewQuantity(v, resource.DecimalSI) } } +func ResourceQuantityString(name corev1.ResourceName, v int64) string { + rq := ResourceQuantity(name, v) + return rq.String() +} diff --git a/pkg/scheduler/flavorassigner/flavorassigner.go b/pkg/scheduler/flavorassigner/flavorassigner.go index 52a2d06724d..bd74c54985a 100644 --- a/pkg/scheduler/flavorassigner/flavorassigner.go +++ b/pkg/scheduler/flavorassigner/flavorassigner.go @@ -597,57 +597,37 @@ func flavorSelector(spec *corev1.PodSpec, allowedKeys sets.Set[string]) nodeaffi // could help), it returns a Status with reasons. func (a *FlavorAssigner) fitsResourceQuota(log logr.Logger, fr resources.FlavorResource, val int64, rQuota *cache.ResourceQuota) (granularMode, bool, *Status) { var status Status - var borrow bool - used := a.cq.Usage[fr] - mode := noFit - if val <= rQuota.Nominal { - // The request can be satisfied by the nominal quota, assuming quota is - // reclaimed from the cohort or assuming all active workloads in the - // ClusterQueue are preempted. - mode = preempt - } - cohortAvailable := rQuota.Nominal - if a.cq.Cohort != nil { - cohortAvailable = a.cq.RequestableCohortQuota(fr) - } - - if a.canPreemptWhileBorrowing() { - // when preemption with borrowing is enabled, we can succeed to admit the - // workload if preemption is used. - if (rQuota.BorrowingLimit == nil || val <= rQuota.Nominal+*rQuota.BorrowingLimit) && val <= cohortAvailable { - mode = preempt - borrow = val > rQuota.Nominal - } - } - if rQuota.BorrowingLimit != nil && used+val > rQuota.Nominal+*rQuota.BorrowingLimit { - status.append(fmt.Sprintf("borrowing limit for %s in flavor %s exceeded", fr.Resource, fr.Flavor)) - return mode, borrow, &status - } - if a.oracle.IsReclaimPossible(log, a.cq, *a.wl, fr, val) { - mode = reclaim - } + borrow := a.cq.BorrowingWith(fr, val) && a.cq.Cohort != nil + available := a.cq.Available(fr) + maxCapacity := a.cq.PotentialAvailable(fr) - cohortUsed := used - if a.cq.Cohort != nil { - cohortUsed = a.cq.UsedCohortQuota(fr) + // No Fit + if val > maxCapacity { + status.append(fmt.Sprintf("insufficient quota for %s in flavor %s, request > maximum capacity (%s > %s)", + fr.Resource, fr.Flavor, resources.ResourceQuantityString(fr.Resource, val), resources.ResourceQuantityString(fr.Resource, maxCapacity))) + return noFit, false, &status } - lack := cohortUsed + val - cohortAvailable - if lack <= 0 { - return fit, used+val > rQuota.Nominal, nil + // Fit + if val <= available { + return fit, borrow, nil } - lackQuantity := resources.ResourceQuantity(fr.Resource, lack) - msg := fmt.Sprintf("insufficient unused quota in cohort for %s in flavor %s, %s more needed", fr.Resource, fr.Flavor, &lackQuantity) - if a.cq.Cohort == nil { - if mode == noFit { - msg = fmt.Sprintf("insufficient quota for %s in flavor %s in ClusterQueue", fr.Resource, fr.Flavor) - } else { - msg = fmt.Sprintf("insufficient unused quota for %s in flavor %s, %s more needed", fr.Resource, fr.Flavor, &lackQuantity) + // Check if preemption is possible + mode := noFit + if val <= rQuota.Nominal { + mode = preempt + if a.oracle.IsReclaimPossible(log, a.cq, *a.wl, fr, val) { + mode = reclaim } + } else if a.canPreemptWhileBorrowing() { + mode = preempt } - status.append(msg) + + status.append(fmt.Sprintf("insufficient unused quota for %s in flavor %s, %s more needed", + fr.Resource, fr.Flavor, resources.ResourceQuantityString(fr.Resource, val-available))) + return mode, borrow, &status } diff --git a/pkg/scheduler/flavorassigner/flavorassigner_test.go b/pkg/scheduler/flavorassigner/flavorassigner_test.go index 96c604800c6..33f0567f888 100644 --- a/pkg/scheduler/flavorassigner/flavorassigner_test.go +++ b/pkg/scheduler/flavorassigner/flavorassigner_test.go @@ -18,7 +18,6 @@ package flavorassigner import ( "context" - "fmt" "testing" "github.com/go-logr/logr" @@ -259,7 +258,7 @@ func TestAssignFlavors(t *testing.T) { }, Status: &Status{ reasons: []string{ - "insufficient quota for memory in flavor b_one in ClusterQueue", + "insufficient quota for memory in flavor b_one, request > maximum capacity (10Mi > 1Mi)", }, }, Count: 1, @@ -373,13 +372,14 @@ func TestAssignFlavors(t *testing.T) { }, Status: &Status{ reasons: []string{ - "insufficient unused quota in cohort for cpu in flavor one, 1 more needed", - "insufficient unused quota in cohort for memory in flavor two, 5Mi more needed", - "insufficient unused quota in cohort for example.com/gpu in flavor b_one, 1 more needed", + "insufficient quota for cpu in flavor one, request > maximum capacity (3 > 2)", + "insufficient unused quota for memory in flavor two, 5Mi more needed", + "insufficient unused quota for example.com/gpu in flavor b_one, 1 more needed", }, }, Count: 1, }}, + Borrowing: true, Usage: resources.FlavorResourceQuantities{ {Flavor: "two", Resource: corev1.ResourceCPU}: 3_000, {Flavor: "two", Resource: corev1.ResourceMemory}: 10 * utiltesting.Mi, @@ -414,8 +414,8 @@ func TestAssignFlavors(t *testing.T) { }, Status: &Status{ reasons: []string{ - "insufficient quota for cpu in flavor one in ClusterQueue", - "insufficient quota for memory in flavor two in ClusterQueue", + "insufficient quota for cpu in flavor one, request > maximum capacity (3 > 2)", + "insufficient quota for memory in flavor two, request > maximum capacity (10Mi > 5Mi)", }, }, Count: 1, @@ -839,7 +839,7 @@ func TestAssignFlavors(t *testing.T) { corev1.ResourceCPU: resource.MustParse("2"), }, Status: &Status{ - reasons: []string{"insufficient unused quota in cohort for cpu in flavor one, 1 more needed"}, + reasons: []string{"insufficient unused quota for cpu in flavor one, 1 more needed"}, }, Count: 1, }}, @@ -882,10 +882,11 @@ func TestAssignFlavors(t *testing.T) { }, Status: &Status{ - reasons: []string{"borrowing limit for cpu in flavor one exceeded"}, + reasons: []string{"insufficient unused quota for cpu in flavor one, 1 more needed"}, }, Count: 1, }}, + Borrowing: true, Usage: resources.FlavorResourceQuantities{ {Flavor: "one", Resource: corev1.ResourceCPU}: 2_000, }, @@ -960,10 +961,11 @@ func TestAssignFlavors(t *testing.T) { corev1.ResourceCPU: resource.MustParse("2"), }, Status: &Status{ - reasons: []string{"insufficient unused quota in cohort for cpu in flavor one, 2 more needed"}, + reasons: []string{"insufficient unused quota for cpu in flavor one, 2 more needed"}, }, Count: 1, }}, + Borrowing: true, Usage: resources.FlavorResourceQuantities{ {Flavor: "one", Resource: corev1.ResourceCPU}: 2_000, }, @@ -1077,7 +1079,7 @@ func TestAssignFlavors(t *testing.T) { }, Status: &Status{ reasons: []string{ - "insufficient quota for cpu in flavor one in ClusterQueue", + "insufficient quota for cpu in flavor one, request > maximum capacity (12 > 4)", "insufficient unused quota for cpu in flavor tainted, 3 more needed", }, }, @@ -1173,7 +1175,7 @@ func TestAssignFlavors(t *testing.T) { corev1.ResourcePods: resource.MustParse("3"), }, Status: &Status{ - reasons: []string{fmt.Sprintf("insufficient quota for %s in flavor default in ClusterQueue", corev1.ResourcePods)}, + reasons: []string{"insufficient quota for pods in flavor default, request > maximum capacity (3 > 2)"}, }, Count: 3, }}, @@ -1510,7 +1512,7 @@ func TestAssignFlavors(t *testing.T) { corev1.ResourceCPU: {Name: "one", Mode: Preempt, TriedFlavorIdx: 0}, }, Status: &Status{ - reasons: []string{"insufficient unused quota in cohort for cpu in flavor one, 10 more needed"}, + reasons: []string{"insufficient unused quota for cpu in flavor one, 10 more needed"}, }, Requests: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("12"), @@ -1565,7 +1567,7 @@ func TestAssignFlavors(t *testing.T) { corev1.ResourceCPU: {Name: "one", Mode: Preempt, TriedFlavorIdx: 0}, }, Status: &Status{ - reasons: []string{"insufficient unused quota in cohort for cpu in flavor one, 10 more needed"}, + reasons: []string{"insufficient unused quota for cpu in flavor one, 10 more needed"}, }, Requests: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("12"), @@ -1664,7 +1666,7 @@ func TestAssignFlavors(t *testing.T) { { Name: "main", Status: &Status{ - reasons: []string{"insufficient unused quota in cohort for cpu in flavor one, 11 more needed"}, + reasons: []string{"insufficient quota for cpu in flavor one, request > maximum capacity (12 > 11)"}, }, Requests: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("12"), @@ -1821,10 +1823,11 @@ func TestAssignFlavors(t *testing.T) { corev1.ResourcePods: resource.MustParse("1"), }, Status: &Status{ - reasons: []string{"insufficient unused quota in cohort for cpu in flavor one, 1 more needed"}, + reasons: []string{"insufficient unused quota for cpu in flavor one, 1 more needed"}, }, Count: 1, }}, + Borrowing: true, Usage: resources.FlavorResourceQuantities{ {Flavor: "one", Resource: corev1.ResourceCPU}: 9_000, {Flavor: "one", Resource: corev1.ResourcePods}: 1, @@ -1868,7 +1871,7 @@ func TestAssignFlavors(t *testing.T) { corev1.ResourceCPU: {Name: "one", Mode: Preempt, TriedFlavorIdx: 0}, }, Status: &Status{ - reasons: []string{"insufficient unused quota in cohort for cpu in flavor one, 10 more needed"}, + reasons: []string{"insufficient unused quota for cpu in flavor one, 10 more needed"}, }, Requests: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("12"), diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 92065696bd0..18ce6a43329 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -792,6 +792,13 @@ func TestSchedule(t *testing.T) { }, }, "multiple CQs need preemption": { + // legacy mode considers eng-alpha/pending to + // be left in queue, rather than inadmissible, + // due to legacy overlapping preemption skip + // logic. Since we are deleting legacy code in + // 0.10, we will disable this test for it, + // rather than duplicate it. + multiplePreemptions: MultiplePremptions, additionalClusterQueues: []kueue.ClusterQueue{ *utiltesting.MakeClusterQueue("other-alpha"). Cohort("other"). @@ -2578,6 +2585,111 @@ func TestSchedule(t *testing.T) { "eng-beta/b1": *utiltesting.MakeAdmission("other-beta").Assignment("gpu", "spot", "5").Obj(), }, }, + "workload requiring reclaimation prioritized over wl in another full cq": { + // Also see #3405. + // + // CQ2 is lending out capacity to its + // Cohort. It has a pending workload, WL2, + // that fits within nominal capacity, and a + // reclaim policy set to any. + + // CQ1 is using half of its capacity, and is + // also lending out remaining capacity. + + // CQ3 has no capacity of its own, and is + // borrowing 10 nominal capacity. + + // With a pending workloads WL1 and WL2 queued + // in CQ1 and CQ2 respectively, we want to + // make sure that the WL2 is processed first, + // so that its preemption calculations are not + // invalidated by CQ1's WL1, which won't fit + // into its nominal capacity given the + // admitted Admitted-Workload-1. + + // As WL1 has an earlier creation timestamp + // than WL2, there was a bug where it would + // process first, reserving capacity which + // invalidated WL2's preemption calculations, + // blocking it indefinitely from reclaiming + // its nominal capacity. + // + // We don't test legacy mode as it classifies + // inadmissible/left different, and we will + // delete that logic shortly. + multiplePreemptions: MultiplePremptions, + additionalClusterQueues: []kueue.ClusterQueue{ + *utiltesting.MakeClusterQueue("CQ1"). + Cohort("other"). + ResourceGroup( + utiltesting.MakeFlavorQuotas("on-demand").Resource("gpu", "10").FlavorQuotas, + ). + Obj(), + *utiltesting.MakeClusterQueue("CQ2"). + Cohort("other"). + Preemption(kueue.ClusterQueuePreemption{ + ReclaimWithinCohort: kueue.PreemptionPolicyAny, + }). + ResourceGroup( + utiltesting.MakeFlavorQuotas("on-demand").Resource("gpu", "10").FlavorQuotas, + ). + Obj(), + *utiltesting.MakeClusterQueue("CQ3"). + Cohort("other"). + ResourceGroup( + utiltesting.MakeFlavorQuotas("on-demand").Resource("gpu", "0").FlavorQuotas, + ). + Obj(), + }, + additionalLocalQueues: []kueue.LocalQueue{ + *utiltesting.MakeLocalQueue("lq", "eng-alpha").ClusterQueue("CQ1").Obj(), + *utiltesting.MakeLocalQueue("lq", "eng-beta").ClusterQueue("CQ2").Obj(), + *utiltesting.MakeLocalQueue("lq", "eng-gamma").ClusterQueue("CQ3").Obj(), + }, + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("Admitted-Workload-1", "eng-alpha"). + Queue("lq"). + Request("gpu", "5"). + SimpleReserveQuota("CQ1", "on-demand", now). + Obj(), + *utiltesting.MakeWorkload("WL1", "eng-alpha"). + Creation(now). + Queue("lq"). + Request("gpu", "10"). + Obj(), + *utiltesting.MakeWorkload("WL2", "eng-beta"). + Creation(now.Add(time.Second)). + Queue("lq"). + Request("gpu", "10"). + Obj(), + *utiltesting.MakeWorkload("Admitted-Workload-2", "eng-gamma"). + Queue("lq"). + Priority(0). + Request("gpu", "5"). + SimpleReserveQuota("CQ3", "on-demand", now). + Obj(), + *utiltesting.MakeWorkload("Admitted-Workload-3", "eng-gamma"). + Queue("lq"). + Priority(1). + Request("gpu", "5"). + SimpleReserveQuota("CQ3", "on-demand", now). + Obj(), + }, + wantPreempted: sets.Set[string]( + sets.NewString("eng-gamma/Admitted-Workload-2"), + ), + wantLeft: map[string][]string{ + "CQ2": {"eng-beta/WL2"}, + }, + wantInadmissibleLeft: map[string][]string{ + "CQ1": {"eng-alpha/WL1"}, + }, + wantAssignments: map[string]kueue.Admission{ + "eng-alpha/Admitted-Workload-1": *utiltesting.MakeAdmission("CQ1").Assignment("gpu", "on-demand", "5").Obj(), + "eng-gamma/Admitted-Workload-2": *utiltesting.MakeAdmission("CQ3").Assignment("gpu", "on-demand", "5").Obj(), + "eng-gamma/Admitted-Workload-3": *utiltesting.MakeAdmission("CQ3").Assignment("gpu", "on-demand", "5").Obj(), + }, + }, } for name, tc := range cases { diff --git a/test/integration/scheduler/fairsharing/fair_sharing_test.go b/test/integration/scheduler/fairsharing/fair_sharing_test.go index cb98cfc4ce9..feb01690602 100644 --- a/test/integration/scheduler/fairsharing/fair_sharing_test.go +++ b/test/integration/scheduler/fairsharing/fair_sharing_test.go @@ -176,7 +176,7 @@ var _ = ginkgo.Describe("Scheduler", func() { Type: kueue.WorkloadQuotaReserved, Status: metav1.ConditionFalse, Reason: "Pending", - Message: "couldn't assign flavors to pod set main: insufficient unused quota in cohort for cpu in flavor default, 2 more needed", + Message: "couldn't assign flavors to pod set main: insufficient quota for cpu in flavor default, request > maximum capacity (10 > 8)", }, util.IgnoreConditionTimestampsAndObservedGeneration), )) }, util.Timeout, util.Interval).Should(gomega.Succeed()) diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index 94fd8859c92..6560dff4203 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -1671,9 +1671,10 @@ var _ = ginkgo.Describe("Scheduler", func() { return k8sClient.Update(ctx, &cq) }, util.Timeout, util.Interval).Should(gomega.Succeed()) - // pendingWl exceed nominal+borrowing quota and cannot preempt due to low priority. + // pendingWl exceed nominal+borrowing quota and cannot preempt as priority based + // premption within CQ is disabled. pendingWl := testing.MakeWorkload("pending-wl", matchingNS.Name).Queue(strictFIFOLocalQueue. - Name).Request(corev1.ResourceCPU, "3").Priority(9).Obj() + Name).Request(corev1.ResourceCPU, "3").Priority(99).Obj() gomega.Expect(k8sClient.Create(ctx, pendingWl)).Should(gomega.Succeed()) // borrowingWL can borrow shared resources, so it should be scheduled even if workloads @@ -2294,7 +2295,7 @@ var _ = ginkgo.Describe("Scheduler", func() { Type: kueue.WorkloadQuotaReserved, Status: metav1.ConditionFalse, Reason: "Pending", - Message: "couldn't assign flavors to pod set main: insufficient unused quota in cohort for memory in flavor on-demand, 1Gi more needed", + Message: "couldn't assign flavors to pod set main: insufficient unused quota for memory in flavor on-demand, 1Gi more needed", }, util.IgnoreConditionTimestampsAndObservedGeneration), )) }, util.Timeout, util.Interval).Should(gomega.Succeed()) @@ -2336,7 +2337,7 @@ var _ = ginkgo.Describe("Scheduler", func() { Type: kueue.WorkloadQuotaReserved, Status: metav1.ConditionFalse, Reason: "Pending", - Message: "couldn't assign flavors to pod set main: insufficient unused quota in cohort for memory in flavor on-demand, 2Gi more needed", + Message: "couldn't assign flavors to pod set main: insufficient unused quota for memory in flavor on-demand, 1Gi more needed", }, util.IgnoreConditionTimestampsAndObservedGeneration), )) }, util.Timeout, util.Interval).Should(gomega.Succeed())