Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 57 additions & 2 deletions pkg/controllers/provisioning/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,61 @@ var _ = Describe("Provisioning", func() {
corev1.ResourceMemory: resource.MustParse("5Gi"),
}, node.Status.Capacity)
})
It("should schedule based on the pod level resources requests", func() {
if env.Version.Minor() < 34 {
Skip("Pod level resources is only on by default starting in K8s version >= 1.34.x")
}

ExpectApplied(ctx, env.Client, test.NodePool())

// Add three instance types, one that's what we want, one that's slightly smaller, one that's slightly bigger.
// If we miscalculate resources, we'll schedule to the smaller instance type rather than the larger one
cloudProvider.InstanceTypes = AddInstanceResources(cloudProvider.InstanceTypes, corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse(fmt.Sprintf("%d", 10)),
corev1.ResourceMemory: resource.MustParse(fmt.Sprintf("%dGi", 4)),
})
cloudProvider.InstanceTypes = AddInstanceResources(cloudProvider.InstanceTypes, corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse(fmt.Sprintf("%d", 11)),
corev1.ResourceMemory: resource.MustParse(fmt.Sprintf("%dGi", 5)),
})
cloudProvider.InstanceTypes = AddInstanceResources(cloudProvider.InstanceTypes, corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse(fmt.Sprintf("%d", 12)),
corev1.ResourceMemory: resource.MustParse(fmt.Sprintf("%dGi", 6)),
})

pod := test.UnschedulablePod(test.PodOptions{
PodResourceRequirements: corev1.ResourceRequirements{
Limits: corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("10"), corev1.ResourceMemory: resource.MustParse("4Gi")},
Requests: corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("9.9"), corev1.ResourceMemory: resource.MustParse("3.9Gi")},
},
ResourceRequirements: corev1.ResourceRequirements{
Limits: corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("5"), corev1.ResourceMemory: resource.MustParse("1Gi")},
Requests: corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("5"), corev1.ResourceMemory: resource.MustParse("1Gi")},
},
InitContainers: []corev1.Container{
{
Resources: corev1.ResourceRequirements{
Limits: corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("6"), corev1.ResourceMemory: resource.MustParse("2Gi")},
Requests: corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("6"), corev1.ResourceMemory: resource.MustParse("2Gi")},
},
},
{
RestartPolicy: lo.ToPtr(corev1.ContainerRestartPolicyAlways),
Resources: corev1.ResourceRequirements{
Limits: corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("4"), corev1.ResourceMemory: resource.MustParse("2Gi")},
Requests: corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("4"), corev1.ResourceMemory: resource.MustParse("2Gi")},
},
},
},
})

ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pod)
node := ExpectScheduled(ctx, env.Client, pod)
ExpectResources(corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("10"),
corev1.ResourceMemory: resource.MustParse("4Gi"),
}, node.Status.Capacity)
})

Context("Resource Limits", func() {
It("should not schedule when limits are exceeded", func() {
Expand Down Expand Up @@ -961,7 +1016,7 @@ var _ = Describe("Provisioning", func() {
test.DaemonSetOptions{PodOptions: test.PodOptions{
ResourceRequirements: corev1.ResourceRequirements{
Limits: corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("10000"), corev1.ResourceMemory: resource.MustParse("10000Gi")},
Requests: corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("1")},
Requests: corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("10000Gi")}, // simulate the API server’s defaulting from limits
},
}},
))
Expand Down Expand Up @@ -1008,7 +1063,7 @@ var _ = Describe("Provisioning", func() {
{
Resources: corev1.ResourceRequirements{
Limits: corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("10000"), corev1.ResourceMemory: resource.MustParse("10000Gi")},
Requests: corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("1")},
Requests: corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("10000Gi")}, // simulate the API server’s defaulting from limits
},
},
},
Expand Down
2 changes: 2 additions & 0 deletions pkg/test/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type PodOptions struct {
Overhead v1.ResourceList
PriorityClassName string
InitContainers []v1.Container
PodResourceRequirements v1.ResourceRequirements
ResourceRequirements v1.ResourceRequirements
NodeSelector map[string]string
NodeRequirements []v1.NodeSelectorRequirement
Expand Down Expand Up @@ -131,6 +132,7 @@ func Pod(overrides ...PodOptions) *v1.Pod {
Affinity: buildAffinity(options),
TopologySpreadConstraints: options.TopologySpreadConstraints,
Tolerations: options.Tolerations,
Resources: &options.PodResourceRequirements,
Containers: []v1.Container{{
Name: RandomName(),
Image: options.Image,
Expand Down
92 changes: 5 additions & 87 deletions pkg/utils/resources/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ limitations under the License.
package resources

import (
"github.com/samber/lo"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
resourcehelper "k8s.io/component-helpers/resource"

"sigs.k8s.io/karpenter/pkg/utils/pretty"
)
Expand Down Expand Up @@ -108,78 +108,12 @@ func SubtractFrom(dest v1.ResourceList, src v1.ResourceList) {
}
}

// podRequests calculates the max between the sum of container resources and max of initContainers along with sidecar feature consideration
// inspired from https://github.com/kubernetes/kubernetes/blob/e2afa175e4077d767745246662170acd86affeaf/pkg/api/v1/resource/helpers.go#L96
// https://kubernetes.io/blog/2023/08/25/native-sidecar-containers/
func podRequests(pod *v1.Pod) v1.ResourceList {
requests := v1.ResourceList{}
restartableInitContainerReqs := v1.ResourceList{}
maxInitContainerReqs := v1.ResourceList{}

for _, container := range pod.Spec.Containers {
MergeInto(requests, MergeResourceLimitsIntoRequests(container))
}

for _, container := range pod.Spec.InitContainers {
containerReqs := MergeResourceLimitsIntoRequests(container)
// If the init container's policy is "Always", then we need to add this container's requests to the total requests. We also need to track this container's request as the required requests for other initContainers
if lo.FromPtr(container.RestartPolicy) == v1.ContainerRestartPolicyAlways {
MergeInto(requests, containerReqs)
MergeInto(restartableInitContainerReqs, containerReqs)
maxInitContainerReqs = MaxResources(maxInitContainerReqs, restartableInitContainerReqs)

} else {
// Else, check whether the current container's resource requests combined with the restartableInitContainer requests are greater than the current max
maxInitContainerReqs = MaxResources(maxInitContainerReqs, Merge(containerReqs, restartableInitContainerReqs))
}
}
// The container's needed requests are the max of all of the container requests combined with native sidecar container requests OR the requests required for a large init containers with native sidecar container requests to run
requests = MaxResources(requests, maxInitContainerReqs)

if pod.Spec.Overhead != nil {
MergeInto(requests, pod.Spec.Overhead)
}

return requests
}

// podLimits calculates the max between the sum of container resources and max of initContainers along with sidecar feature consideration
// inspired from https://github.com/kubernetes/kubernetes/blob/e2afa175e4077d767745246662170acd86affeaf/pkg/api/v1/resource/helpers.go#L96
// https://kubernetes.io/blog/2023/08/25/native-sidecar-containers/
func podLimits(pod *v1.Pod) v1.ResourceList {
limits := v1.ResourceList{}
restartableInitContainerLimits := v1.ResourceList{}
maxInitContainerLimits := v1.ResourceList{}

for _, container := range pod.Spec.Containers {
MergeInto(limits, container.Resources.Limits)
}

for _, container := range pod.Spec.InitContainers {
// If the init container's policy is "Always", then we need to add this container's limits to the total limits. We also need to track this container's limit as the required limits for other initContainers
if lo.FromPtr(container.RestartPolicy) == v1.ContainerRestartPolicyAlways {
MergeInto(limits, container.Resources.Limits)
MergeInto(restartableInitContainerLimits, container.Resources.Limits)
maxInitContainerLimits = MaxResources(maxInitContainerLimits, restartableInitContainerLimits)
} else {
// Else, check whether the current container's resource limits combined with the restartableInitContainer limits are greater than the current max
maxInitContainerLimits = MaxResources(maxInitContainerLimits, Merge(container.Resources.Limits, restartableInitContainerLimits))
}
}
// The container's needed limits are the max of all of the container limits combined with native sidecar container limits OR the limits required for a large init containers with native sidecar container limits to run
limits = MaxResources(limits, maxInitContainerLimits)

if pod.Spec.Overhead != nil {
MergeInto(limits, pod.Spec.Overhead)
}

return limits
}

// Ceiling computes the effective resource requirements for a given Pod,
// using the same logic as the scheduler.
func Ceiling(pod *v1.Pod) v1.ResourceRequirements {
return v1.ResourceRequirements{
Requests: podRequests(pod),
Limits: podLimits(pod),
Requests: resourcehelper.PodRequests(pod, resourcehelper.PodResourcesOptions{}),
Limits: resourcehelper.PodLimits(pod, resourcehelper.PodResourcesOptions{}),
}
}

Expand All @@ -196,22 +130,6 @@ func MaxResources(resources ...v1.ResourceList) v1.ResourceList {
return resourceList
}

// MergeResourceLimitsIntoRequests merges resource limits into requests if no request exists for the given resource
func MergeResourceLimitsIntoRequests(container v1.Container) v1.ResourceList {
ret := v1.ResourceList{}
for resourceName, quantity := range container.Resources.Requests {
ret[resourceName] = quantity
}
if container.Resources.Limits != nil {
for resourceName, quantity := range container.Resources.Limits {
if _, ok := container.Resources.Requests[resourceName]; !ok {
ret[resourceName] = quantity
}
}
}
return ret
}

// Quantity parses the string value into a *Quantity
func Quantity(value string) *resource.Quantity {
r := resource.MustParse(value)
Expand Down
105 changes: 77 additions & 28 deletions pkg/utils/resources/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,37 +565,86 @@ var _ = Describe("Resources", func() {
})
})
})
})
Context("Resource Merging", func() {
It("should merge resource limits into requests if no request exists for the given container", func() {
container := v1.Container{
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2"),
v1.ResourceMemory: resource.MustParse("1Gi"),
Context("Pod Level Resources", func() {
It("should calculate resource requests when the pod level resources is specified", func() {
pod := test.Pod(test.PodOptions{
PodResourceRequirements: v1.ResourceRequirements{
Limits: v1.ResourceList{v1.ResourceCPU: resource.MustParse("4"), v1.ResourceMemory: resource.MustParse("4Gi")},
Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("2"), v1.ResourceMemory: resource.MustParse("2Gi")},
},
},
}
requests := resources.MergeResourceLimitsIntoRequests(container)
ExpectResources(requests, v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2"),
v1.ResourceMemory: resource.MustParse("1Gi"),
ResourceRequirements: v1.ResourceRequirements{
Limits: v1.ResourceList{v1.ResourceCPU: resource.MustParse("3"), v1.ResourceMemory: resource.MustParse("3Gi")},
Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")},
},
})
podResources := resources.Ceiling(pod)
ExpectResources(podResources.Requests, v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2"),
v1.ResourceMemory: resource.MustParse("2Gi"),
})
ExpectResources(podResources.Limits, v1.ResourceList{
v1.ResourceCPU: resource.MustParse("4"),
v1.ResourceMemory: resource.MustParse("4Gi"),
})
})
})
It("should merge resource limits into requests if no request exists for the given sidecarContainer", func() {
container := v1.Container{
RestartPolicy: lo.ToPtr(v1.ContainerRestartPolicyAlways),
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2"),
v1.ResourceMemory: resource.MustParse("1Gi"),
It("should calculate resource requests when only the pod level resources request is specified", func() {
pod := test.Pod(test.PodOptions{
PodResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("2"), v1.ResourceMemory: resource.MustParse("2Gi")},
},
},
}
requests := resources.MergeResourceLimitsIntoRequests(container)
ExpectResources(requests, v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2"),
v1.ResourceMemory: resource.MustParse("1Gi"),
ResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")},
Limits: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")},
},
InitContainers: []v1.Container{
{
RestartPolicy: lo.ToPtr(v1.ContainerRestartPolicyAlways),
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")},
Limits: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")},
},
},
},
})
podResources := resources.Ceiling(pod)
ExpectResources(podResources.Requests, v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2"),
v1.ResourceMemory: resource.MustParse("2Gi"),
})
ExpectResources(podResources.Limits, v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2"),
v1.ResourceMemory: resource.MustParse("2Gi"),
})
})
It("should calculate resource requests when the pod level resources requests is defaulted from limits", func() {
pod := test.Pod(test.PodOptions{
PodResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("10"), v1.ResourceMemory: resource.MustParse("10Gi")}, // simulate the API server’s defaulting from limits
Limits: v1.ResourceList{v1.ResourceCPU: resource.MustParse("10"), v1.ResourceMemory: resource.MustParse("10Gi")},
},
ResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")},
Limits: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")},
},
InitContainers: []v1.Container{
{
RestartPolicy: lo.ToPtr(v1.ContainerRestartPolicyAlways),
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")},
Limits: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")},
},
},
},
})
podResources := resources.Ceiling(pod)
ExpectResources(podResources.Requests, v1.ResourceList{
v1.ResourceCPU: resource.MustParse("10"),
v1.ResourceMemory: resource.MustParse("10Gi"),
})
ExpectResources(podResources.Limits, v1.ResourceList{
v1.ResourceCPU: resource.MustParse("10"),
v1.ResourceMemory: resource.MustParse("10Gi"),
})
})
})
})
Expand Down
Loading