Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions pkg/cache/clusterqueue_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,20 @@ func (c *ClusterQueueSnapshot) Available(fr resources.FlavorResource) int64 {
return max(0, capacityAvailable)
}

func (c *ClusterQueueSnapshot) PotentialAvailable(fr resources.FlavorResource) int64 {
if c.Cohort == nil {
return c.nominal(fr)
}
potential := c.RequestableCohortQuota(fr)

// if the borrowing limit exists, we cap our potential capacity by the borrowing limit.
if borrowingLimit := c.borrowingLimit(fr); borrowingLimit != nil {
withBorrowingRemaining := c.nominal(fr) + *borrowingLimit
potential = min(potential, withBorrowingRemaining)
}
return max(0, potential)
}

func (c *ClusterQueueSnapshot) nominal(fr resources.FlavorResource) int64 {
if quota := c.QuotaFor(fr); quota != nil {
return quota.Nominal
Expand Down
4 changes: 4 additions & 0 deletions pkg/resources/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,7 @@ func ResourceQuantity(name corev1.ResourceName, v int64) resource.Quantity {
return *resource.NewQuantity(v, resource.DecimalSI)
}
}
func ResourceQuantityString(name corev1.ResourceName, v int64) string {
rq := ResourceQuantity(name, v)
return rq.String()
}
66 changes: 23 additions & 43 deletions pkg/scheduler/flavorassigner/flavorassigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,57 +597,37 @@ func flavorSelector(spec *corev1.PodSpec, allowedKeys sets.Set[string]) nodeaffi
// could help), it returns a Status with reasons.
func (a *FlavorAssigner) fitsResourceQuota(log logr.Logger, fr resources.FlavorResource, val int64, rQuota *cache.ResourceQuota) (granularMode, bool, *Status) {
var status Status
var borrow bool
used := a.cq.Usage[fr]
mode := noFit
if val <= rQuota.Nominal {
// The request can be satisfied by the nominal quota, assuming quota is
// reclaimed from the cohort or assuming all active workloads in the
// ClusterQueue are preempted.
mode = preempt
}
cohortAvailable := rQuota.Nominal
if a.cq.Cohort != nil {
cohortAvailable = a.cq.RequestableCohortQuota(fr)
}

if a.canPreemptWhileBorrowing() {
// when preemption with borrowing is enabled, we can succeed to admit the
// workload if preemption is used.
if (rQuota.BorrowingLimit == nil || val <= rQuota.Nominal+*rQuota.BorrowingLimit) && val <= cohortAvailable {
mode = preempt
borrow = val > rQuota.Nominal
}
}
if rQuota.BorrowingLimit != nil && used+val > rQuota.Nominal+*rQuota.BorrowingLimit {
status.append(fmt.Sprintf("borrowing limit for %s in flavor %s exceeded", fr.Resource, fr.Flavor))
return mode, borrow, &status
}

if a.oracle.IsReclaimPossible(log, a.cq, *a.wl, fr, val) {
mode = reclaim
}
borrow := a.cq.BorrowingWith(fr, val) && a.cq.Cohort != nil
available := a.cq.Available(fr)
maxCapacity := a.cq.PotentialAvailable(fr)

cohortUsed := used
if a.cq.Cohort != nil {
cohortUsed = a.cq.UsedCohortQuota(fr)
// No Fit
if val > maxCapacity {
status.append(fmt.Sprintf("insufficient quota for %s in flavor %s, request > maximum capacity (%s > %s)",
fr.Resource, fr.Flavor, resources.ResourceQuantityString(fr.Resource, val), resources.ResourceQuantityString(fr.Resource, maxCapacity)))
return noFit, false, &status
}

lack := cohortUsed + val - cohortAvailable
if lack <= 0 {
return fit, used+val > rQuota.Nominal, nil
// Fit
if val <= available {
return fit, borrow, nil
}

lackQuantity := resources.ResourceQuantity(fr.Resource, lack)
msg := fmt.Sprintf("insufficient unused quota in cohort for %s in flavor %s, %s more needed", fr.Resource, fr.Flavor, &lackQuantity)
if a.cq.Cohort == nil {
if mode == noFit {
msg = fmt.Sprintf("insufficient quota for %s in flavor %s in ClusterQueue", fr.Resource, fr.Flavor)
} else {
msg = fmt.Sprintf("insufficient unused quota for %s in flavor %s, %s more needed", fr.Resource, fr.Flavor, &lackQuantity)
// Check if preemption is possible
mode := noFit
if val <= rQuota.Nominal {
mode = preempt
if a.oracle.IsReclaimPossible(log, a.cq, *a.wl, fr, val) {
mode = reclaim
}
} else if a.canPreemptWhileBorrowing() {
mode = preempt
}
status.append(msg)

status.append(fmt.Sprintf("insufficient unused quota for %s in flavor %s, %s more needed",
fr.Resource, fr.Flavor, resources.ResourceQuantityString(fr.Resource, val-available)))

return mode, borrow, &status
}

Expand Down
37 changes: 20 additions & 17 deletions pkg/scheduler/flavorassigner/flavorassigner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package flavorassigner

import (
"context"
"fmt"
"testing"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -259,7 +258,7 @@ func TestAssignFlavors(t *testing.T) {
},
Status: &Status{
reasons: []string{
"insufficient quota for memory in flavor b_one in ClusterQueue",
"insufficient quota for memory in flavor b_one, request > maximum capacity (10Mi > 1Mi)",
},
},
Count: 1,
Expand Down Expand Up @@ -373,13 +372,14 @@ func TestAssignFlavors(t *testing.T) {
},
Status: &Status{
reasons: []string{
"insufficient unused quota in cohort for cpu in flavor one, 1 more needed",
"insufficient unused quota in cohort for memory in flavor two, 5Mi more needed",
"insufficient unused quota in cohort for example.com/gpu in flavor b_one, 1 more needed",
"insufficient quota for cpu in flavor one, request > maximum capacity (3 > 2)",
"insufficient unused quota for memory in flavor two, 5Mi more needed",
"insufficient unused quota for example.com/gpu in flavor b_one, 1 more needed",
},
},
Count: 1,
}},
Borrowing: true,
Usage: resources.FlavorResourceQuantities{
{Flavor: "two", Resource: corev1.ResourceCPU}: 3_000,
{Flavor: "two", Resource: corev1.ResourceMemory}: 10 * utiltesting.Mi,
Expand Down Expand Up @@ -414,8 +414,8 @@ func TestAssignFlavors(t *testing.T) {
},
Status: &Status{
reasons: []string{
"insufficient quota for cpu in flavor one in ClusterQueue",
"insufficient quota for memory in flavor two in ClusterQueue",
"insufficient quota for cpu in flavor one, request > maximum capacity (3 > 2)",
"insufficient quota for memory in flavor two, request > maximum capacity (10Mi > 5Mi)",
},
},
Count: 1,
Expand Down Expand Up @@ -839,7 +839,7 @@ func TestAssignFlavors(t *testing.T) {
corev1.ResourceCPU: resource.MustParse("2"),
},
Status: &Status{
reasons: []string{"insufficient unused quota in cohort for cpu in flavor one, 1 more needed"},
reasons: []string{"insufficient unused quota for cpu in flavor one, 1 more needed"},
},
Count: 1,
}},
Expand Down Expand Up @@ -882,10 +882,11 @@ func TestAssignFlavors(t *testing.T) {
},

Status: &Status{
reasons: []string{"borrowing limit for cpu in flavor one exceeded"},
reasons: []string{"insufficient unused quota for cpu in flavor one, 1 more needed"},
},
Count: 1,
}},
Borrowing: true,
Usage: resources.FlavorResourceQuantities{
{Flavor: "one", Resource: corev1.ResourceCPU}: 2_000,
},
Expand Down Expand Up @@ -960,10 +961,11 @@ func TestAssignFlavors(t *testing.T) {
corev1.ResourceCPU: resource.MustParse("2"),
},
Status: &Status{
reasons: []string{"insufficient unused quota in cohort for cpu in flavor one, 2 more needed"},
reasons: []string{"insufficient unused quota for cpu in flavor one, 2 more needed"},
},
Count: 1,
}},
Borrowing: true,
Usage: resources.FlavorResourceQuantities{
{Flavor: "one", Resource: corev1.ResourceCPU}: 2_000,
},
Expand Down Expand Up @@ -1077,7 +1079,7 @@ func TestAssignFlavors(t *testing.T) {
},
Status: &Status{
reasons: []string{
"insufficient quota for cpu in flavor one in ClusterQueue",
"insufficient quota for cpu in flavor one, request > maximum capacity (12 > 4)",
"insufficient unused quota for cpu in flavor tainted, 3 more needed",
},
},
Expand Down Expand Up @@ -1173,7 +1175,7 @@ func TestAssignFlavors(t *testing.T) {
corev1.ResourcePods: resource.MustParse("3"),
},
Status: &Status{
reasons: []string{fmt.Sprintf("insufficient quota for %s in flavor default in ClusterQueue", corev1.ResourcePods)},
reasons: []string{"insufficient quota for pods in flavor default, request > maximum capacity (3 > 2)"},
},
Count: 3,
}},
Expand Down Expand Up @@ -1510,7 +1512,7 @@ func TestAssignFlavors(t *testing.T) {
corev1.ResourceCPU: {Name: "one", Mode: Preempt, TriedFlavorIdx: 0},
},
Status: &Status{
reasons: []string{"insufficient unused quota in cohort for cpu in flavor one, 10 more needed"},
reasons: []string{"insufficient unused quota for cpu in flavor one, 10 more needed"},
},
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("12"),
Expand Down Expand Up @@ -1565,7 +1567,7 @@ func TestAssignFlavors(t *testing.T) {
corev1.ResourceCPU: {Name: "one", Mode: Preempt, TriedFlavorIdx: 0},
},
Status: &Status{
reasons: []string{"insufficient unused quota in cohort for cpu in flavor one, 10 more needed"},
reasons: []string{"insufficient unused quota for cpu in flavor one, 10 more needed"},
},
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("12"),
Expand Down Expand Up @@ -1664,7 +1666,7 @@ func TestAssignFlavors(t *testing.T) {
{
Name: "main",
Status: &Status{
reasons: []string{"insufficient unused quota in cohort for cpu in flavor one, 11 more needed"},
reasons: []string{"insufficient quota for cpu in flavor one, request > maximum capacity (12 > 11)"},
},
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("12"),
Expand Down Expand Up @@ -1821,10 +1823,11 @@ func TestAssignFlavors(t *testing.T) {
corev1.ResourcePods: resource.MustParse("1"),
},
Status: &Status{
reasons: []string{"insufficient unused quota in cohort for cpu in flavor one, 1 more needed"},
reasons: []string{"insufficient unused quota for cpu in flavor one, 1 more needed"},
},
Count: 1,
}},
Borrowing: true,
Usage: resources.FlavorResourceQuantities{
{Flavor: "one", Resource: corev1.ResourceCPU}: 9_000,
{Flavor: "one", Resource: corev1.ResourcePods}: 1,
Expand Down Expand Up @@ -1868,7 +1871,7 @@ func TestAssignFlavors(t *testing.T) {
corev1.ResourceCPU: {Name: "one", Mode: Preempt, TriedFlavorIdx: 0},
},
Status: &Status{
reasons: []string{"insufficient unused quota in cohort for cpu in flavor one, 10 more needed"},
reasons: []string{"insufficient unused quota for cpu in flavor one, 10 more needed"},
},
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("12"),
Expand Down
112 changes: 112 additions & 0 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,13 @@ func TestSchedule(t *testing.T) {
},
},
"multiple CQs need preemption": {
// legacy mode considers eng-alpha/pending to
// be left in queue, rather than inadmissible,
// due to legacy overlapping preemption skip
// logic. Since we are deleting legacy code in
// 0.10, we will disable this test for it,
// rather than duplicate it.
multiplePreemptions: MultiplePremptions,
additionalClusterQueues: []kueue.ClusterQueue{
*utiltesting.MakeClusterQueue("other-alpha").
Cohort("other").
Expand Down Expand Up @@ -2578,6 +2585,111 @@ func TestSchedule(t *testing.T) {
"eng-beta/b1": *utiltesting.MakeAdmission("other-beta").Assignment("gpu", "spot", "5").Obj(),
},
},
"workload requiring reclaimation prioritized over wl in another full cq": {
// Also see #3405.
//
// CQ2 is lending out capacity to its
// Cohort. It has a pending workload, WL2,
// that fits within nominal capacity, and a
// reclaim policy set to any.

// CQ1 is using half of its capacity, and is
// also lending out remaining capacity.

// CQ3 has no capacity of its own, and is
// borrowing 10 nominal capacity.

// With a pending workloads WL1 and WL2 queued
// in CQ1 and CQ2 respectively, we want to
// make sure that the WL2 is processed first,
// so that its preemption calculations are not
// invalidated by CQ1's WL1, which won't fit
// into its nominal capacity given the
// admitted Admitted-Workload-1.

// As WL1 has an earlier creation timestamp
// than WL2, there was a bug where it would
// process first, reserving capacity which
// invalidated WL2's preemption calculations,
// blocking it indefinitely from reclaiming
// its nominal capacity.
//
// We don't test legacy mode as it classifies
// inadmissible/left different, and we will
// delete that logic shortly.
multiplePreemptions: MultiplePremptions,
additionalClusterQueues: []kueue.ClusterQueue{
*utiltesting.MakeClusterQueue("CQ1").
Cohort("other").
ResourceGroup(
utiltesting.MakeFlavorQuotas("on-demand").Resource("gpu", "10").FlavorQuotas,
).
Obj(),
*utiltesting.MakeClusterQueue("CQ2").
Cohort("other").
Preemption(kueue.ClusterQueuePreemption{
ReclaimWithinCohort: kueue.PreemptionPolicyAny,
}).
ResourceGroup(
utiltesting.MakeFlavorQuotas("on-demand").Resource("gpu", "10").FlavorQuotas,
).
Obj(),
*utiltesting.MakeClusterQueue("CQ3").
Cohort("other").
ResourceGroup(
utiltesting.MakeFlavorQuotas("on-demand").Resource("gpu", "0").FlavorQuotas,
).
Obj(),
},
additionalLocalQueues: []kueue.LocalQueue{
*utiltesting.MakeLocalQueue("lq", "eng-alpha").ClusterQueue("CQ1").Obj(),
*utiltesting.MakeLocalQueue("lq", "eng-beta").ClusterQueue("CQ2").Obj(),
*utiltesting.MakeLocalQueue("lq", "eng-gamma").ClusterQueue("CQ3").Obj(),
},
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("Admitted-Workload-1", "eng-alpha").
Queue("lq").
Request("gpu", "5").
SimpleReserveQuota("CQ1", "on-demand", now).
Obj(),
*utiltesting.MakeWorkload("WL1", "eng-alpha").
Creation(now).
Queue("lq").
Request("gpu", "10").
Obj(),
*utiltesting.MakeWorkload("WL2", "eng-beta").
Creation(now.Add(time.Second)).
Queue("lq").
Request("gpu", "10").
Obj(),
*utiltesting.MakeWorkload("Admitted-Workload-2", "eng-gamma").
Queue("lq").
Priority(0).
Request("gpu", "5").
SimpleReserveQuota("CQ3", "on-demand", now).
Obj(),
*utiltesting.MakeWorkload("Admitted-Workload-3", "eng-gamma").
Queue("lq").
Priority(1).
Request("gpu", "5").
SimpleReserveQuota("CQ3", "on-demand", now).
Obj(),
},
wantPreempted: sets.Set[string](
sets.NewString("eng-gamma/Admitted-Workload-2"),
),
wantLeft: map[string][]string{
"CQ2": {"eng-beta/WL2"},
},
wantInadmissibleLeft: map[string][]string{
"CQ1": {"eng-alpha/WL1"},
},
wantAssignments: map[string]kueue.Admission{
"eng-alpha/Admitted-Workload-1": *utiltesting.MakeAdmission("CQ1").Assignment("gpu", "on-demand", "5").Obj(),
"eng-gamma/Admitted-Workload-2": *utiltesting.MakeAdmission("CQ3").Assignment("gpu", "on-demand", "5").Obj(),
"eng-gamma/Admitted-Workload-3": *utiltesting.MakeAdmission("CQ3").Assignment("gpu", "on-demand", "5").Obj(),
},
},
}

for name, tc := range cases {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ var _ = ginkgo.Describe("Scheduler", func() {
Type: kueue.WorkloadQuotaReserved,
Status: metav1.ConditionFalse,
Reason: "Pending",
Message: "couldn't assign flavors to pod set main: insufficient unused quota in cohort for cpu in flavor default, 2 more needed",
Message: "couldn't assign flavors to pod set main: insufficient quota for cpu in flavor default, request > maximum capacity (10 > 8)",
}, util.IgnoreConditionTimestampsAndObservedGeneration),
))
}, util.Timeout, util.Interval).Should(gomega.Succeed())
Expand Down
Loading