From 9aab1150783ef859f0c8bd198f39c335159408d7 Mon Sep 17 00:00:00 2001 From: Tsubasa Nagasawa Date: Fri, 18 Jul 2025 16:54:42 +0900 Subject: [PATCH 1/2] feat: Support Pod Level Resources MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaced Karpenter’s custom Pod resources calculation logic with k8s.io/component-helpers/resources shared package. Signed-off-by: Tsubasa Nagasawa --- pkg/controllers/provisioning/suite_test.go | 55 +++++++++++ pkg/test/pods.go | 2 + pkg/utils/resources/resources.go | 92 +----------------- pkg/utils/resources/suite_test.go | 105 +++++++++++++++------ 4 files changed, 139 insertions(+), 115 deletions(-) diff --git a/pkg/controllers/provisioning/suite_test.go b/pkg/controllers/provisioning/suite_test.go index ed9392ed74..451a2ab304 100644 --- a/pkg/controllers/provisioning/suite_test.go +++ b/pkg/controllers/provisioning/suite_test.go @@ -681,6 +681,61 @@ var _ = Describe("Provisioning", func() { corev1.ResourceMemory: resource.MustParse("5Gi"), }, node.Status.Capacity) }) + It("should schedule based on the pod level resources requests", func() { + if env.Version.Minor() < 34 { + Skip("Pod level resources is only on by default starting in K8s version >= 1.34.x") + } + + ExpectApplied(ctx, env.Client, test.NodePool()) + + // Add three instance types, one that's what we want, one that's slightly smaller, one that's slightly bigger. + // If we miscalculate resources, we'll schedule to the smaller instance type rather than the larger one + cloudProvider.InstanceTypes = AddInstanceResources(cloudProvider.InstanceTypes, corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse(fmt.Sprintf("%d", 10)), + corev1.ResourceMemory: resource.MustParse(fmt.Sprintf("%dGi", 4)), + }) + cloudProvider.InstanceTypes = AddInstanceResources(cloudProvider.InstanceTypes, corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse(fmt.Sprintf("%d", 11)), + corev1.ResourceMemory: resource.MustParse(fmt.Sprintf("%dGi", 5)), + }) + cloudProvider.InstanceTypes = AddInstanceResources(cloudProvider.InstanceTypes, corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse(fmt.Sprintf("%d", 12)), + corev1.ResourceMemory: resource.MustParse(fmt.Sprintf("%dGi", 6)), + }) + + pod := test.UnschedulablePod(test.PodOptions{ + PodResourceRequirements: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("10"), corev1.ResourceMemory: resource.MustParse("4Gi")}, + Requests: corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("9.9"), corev1.ResourceMemory: resource.MustParse("3.9Gi")}, + }, + ResourceRequirements: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("5"), corev1.ResourceMemory: resource.MustParse("1Gi")}, + Requests: corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("5"), corev1.ResourceMemory: resource.MustParse("1Gi")}, + }, + InitContainers: []corev1.Container{ + { + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("6"), corev1.ResourceMemory: resource.MustParse("2Gi")}, + Requests: corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("6"), corev1.ResourceMemory: resource.MustParse("2Gi")}, + }, + }, + { + RestartPolicy: lo.ToPtr(corev1.ContainerRestartPolicyAlways), + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("4"), corev1.ResourceMemory: resource.MustParse("2Gi")}, + Requests: corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("4"), corev1.ResourceMemory: resource.MustParse("2Gi")}, + }, + }, + }, + }) + + ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pod) + node := ExpectScheduled(ctx, env.Client, pod) + ExpectResources(corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("10"), + corev1.ResourceMemory: resource.MustParse("4Gi"), + }, node.Status.Capacity) + }) Context("Resource Limits", func() { It("should not schedule when limits are exceeded", func() { diff --git a/pkg/test/pods.go b/pkg/test/pods.go index b26ff141fb..74d4f11d48 100644 --- a/pkg/test/pods.go +++ b/pkg/test/pods.go @@ -37,6 +37,7 @@ type PodOptions struct { Overhead v1.ResourceList PriorityClassName string InitContainers []v1.Container + PodResourceRequirements v1.ResourceRequirements ResourceRequirements v1.ResourceRequirements NodeSelector map[string]string NodeRequirements []v1.NodeSelectorRequirement @@ -131,6 +132,7 @@ func Pod(overrides ...PodOptions) *v1.Pod { Affinity: buildAffinity(options), TopologySpreadConstraints: options.TopologySpreadConstraints, Tolerations: options.Tolerations, + Resources: &options.PodResourceRequirements, Containers: []v1.Container{{ Name: RandomName(), Image: options.Image, diff --git a/pkg/utils/resources/resources.go b/pkg/utils/resources/resources.go index e1bfb5fc1f..2590963ee8 100644 --- a/pkg/utils/resources/resources.go +++ b/pkg/utils/resources/resources.go @@ -17,9 +17,9 @@ limitations under the License. package resources import ( - "github.com/samber/lo" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + resourcehelper "k8s.io/component-helpers/resource" "sigs.k8s.io/karpenter/pkg/utils/pretty" ) @@ -108,78 +108,12 @@ func SubtractFrom(dest v1.ResourceList, src v1.ResourceList) { } } -// podRequests calculates the max between the sum of container resources and max of initContainers along with sidecar feature consideration -// inspired from https://github.com/kubernetes/kubernetes/blob/e2afa175e4077d767745246662170acd86affeaf/pkg/api/v1/resource/helpers.go#L96 -// https://kubernetes.io/blog/2023/08/25/native-sidecar-containers/ -func podRequests(pod *v1.Pod) v1.ResourceList { - requests := v1.ResourceList{} - restartableInitContainerReqs := v1.ResourceList{} - maxInitContainerReqs := v1.ResourceList{} - - for _, container := range pod.Spec.Containers { - MergeInto(requests, MergeResourceLimitsIntoRequests(container)) - } - - for _, container := range pod.Spec.InitContainers { - containerReqs := MergeResourceLimitsIntoRequests(container) - // If the init container's policy is "Always", then we need to add this container's requests to the total requests. We also need to track this container's request as the required requests for other initContainers - if lo.FromPtr(container.RestartPolicy) == v1.ContainerRestartPolicyAlways { - MergeInto(requests, containerReqs) - MergeInto(restartableInitContainerReqs, containerReqs) - maxInitContainerReqs = MaxResources(maxInitContainerReqs, restartableInitContainerReqs) - - } else { - // Else, check whether the current container's resource requests combined with the restartableInitContainer requests are greater than the current max - maxInitContainerReqs = MaxResources(maxInitContainerReqs, Merge(containerReqs, restartableInitContainerReqs)) - } - } - // The container's needed requests are the max of all of the container requests combined with native sidecar container requests OR the requests required for a large init containers with native sidecar container requests to run - requests = MaxResources(requests, maxInitContainerReqs) - - if pod.Spec.Overhead != nil { - MergeInto(requests, pod.Spec.Overhead) - } - - return requests -} - -// podLimits calculates the max between the sum of container resources and max of initContainers along with sidecar feature consideration -// inspired from https://github.com/kubernetes/kubernetes/blob/e2afa175e4077d767745246662170acd86affeaf/pkg/api/v1/resource/helpers.go#L96 -// https://kubernetes.io/blog/2023/08/25/native-sidecar-containers/ -func podLimits(pod *v1.Pod) v1.ResourceList { - limits := v1.ResourceList{} - restartableInitContainerLimits := v1.ResourceList{} - maxInitContainerLimits := v1.ResourceList{} - - for _, container := range pod.Spec.Containers { - MergeInto(limits, container.Resources.Limits) - } - - for _, container := range pod.Spec.InitContainers { - // If the init container's policy is "Always", then we need to add this container's limits to the total limits. We also need to track this container's limit as the required limits for other initContainers - if lo.FromPtr(container.RestartPolicy) == v1.ContainerRestartPolicyAlways { - MergeInto(limits, container.Resources.Limits) - MergeInto(restartableInitContainerLimits, container.Resources.Limits) - maxInitContainerLimits = MaxResources(maxInitContainerLimits, restartableInitContainerLimits) - } else { - // Else, check whether the current container's resource limits combined with the restartableInitContainer limits are greater than the current max - maxInitContainerLimits = MaxResources(maxInitContainerLimits, Merge(container.Resources.Limits, restartableInitContainerLimits)) - } - } - // The container's needed limits are the max of all of the container limits combined with native sidecar container limits OR the limits required for a large init containers with native sidecar container limits to run - limits = MaxResources(limits, maxInitContainerLimits) - - if pod.Spec.Overhead != nil { - MergeInto(limits, pod.Spec.Overhead) - } - - return limits -} - +// Ceiling computes the effective resource requirements for a given Pod, +// using the same logic as the scheduler. func Ceiling(pod *v1.Pod) v1.ResourceRequirements { return v1.ResourceRequirements{ - Requests: podRequests(pod), - Limits: podLimits(pod), + Requests: resourcehelper.PodRequests(pod, resourcehelper.PodResourcesOptions{}), + Limits: resourcehelper.PodLimits(pod, resourcehelper.PodResourcesOptions{}), } } @@ -196,22 +130,6 @@ func MaxResources(resources ...v1.ResourceList) v1.ResourceList { return resourceList } -// MergeResourceLimitsIntoRequests merges resource limits into requests if no request exists for the given resource -func MergeResourceLimitsIntoRequests(container v1.Container) v1.ResourceList { - ret := v1.ResourceList{} - for resourceName, quantity := range container.Resources.Requests { - ret[resourceName] = quantity - } - if container.Resources.Limits != nil { - for resourceName, quantity := range container.Resources.Limits { - if _, ok := container.Resources.Requests[resourceName]; !ok { - ret[resourceName] = quantity - } - } - } - return ret -} - // Quantity parses the string value into a *Quantity func Quantity(value string) *resource.Quantity { r := resource.MustParse(value) diff --git a/pkg/utils/resources/suite_test.go b/pkg/utils/resources/suite_test.go index d3cb46c5f2..1fac7cca4e 100644 --- a/pkg/utils/resources/suite_test.go +++ b/pkg/utils/resources/suite_test.go @@ -565,37 +565,86 @@ var _ = Describe("Resources", func() { }) }) }) - }) - Context("Resource Merging", func() { - It("should merge resource limits into requests if no request exists for the given container", func() { - container := v1.Container{ - Resources: v1.ResourceRequirements{ - Limits: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("2"), - v1.ResourceMemory: resource.MustParse("1Gi"), + Context("Pod Level Resources", func() { + It("should calculate resource requests when the pod level resources is specified", func() { + pod := test.Pod(test.PodOptions{ + PodResourceRequirements: v1.ResourceRequirements{ + Limits: v1.ResourceList{v1.ResourceCPU: resource.MustParse("4"), v1.ResourceMemory: resource.MustParse("4Gi")}, + Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("2"), v1.ResourceMemory: resource.MustParse("2Gi")}, }, - }, - } - requests := resources.MergeResourceLimitsIntoRequests(container) - ExpectResources(requests, v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("2"), - v1.ResourceMemory: resource.MustParse("1Gi"), + ResourceRequirements: v1.ResourceRequirements{ + Limits: v1.ResourceList{v1.ResourceCPU: resource.MustParse("3"), v1.ResourceMemory: resource.MustParse("3Gi")}, + Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}, + }, + }) + podResources := resources.Ceiling(pod) + ExpectResources(podResources.Requests, v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("2"), + v1.ResourceMemory: resource.MustParse("2Gi"), + }) + ExpectResources(podResources.Limits, v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("4Gi"), + }) }) - }) - It("should merge resource limits into requests if no request exists for the given sidecarContainer", func() { - container := v1.Container{ - RestartPolicy: lo.ToPtr(v1.ContainerRestartPolicyAlways), - Resources: v1.ResourceRequirements{ - Limits: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("2"), - v1.ResourceMemory: resource.MustParse("1Gi"), + It("should calculate resource requests when only the pod level resources request is specified", func() { + pod := test.Pod(test.PodOptions{ + PodResourceRequirements: v1.ResourceRequirements{ + Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("2"), v1.ResourceMemory: resource.MustParse("2Gi")}, }, - }, - } - requests := resources.MergeResourceLimitsIntoRequests(container) - ExpectResources(requests, v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("2"), - v1.ResourceMemory: resource.MustParse("1Gi"), + ResourceRequirements: v1.ResourceRequirements{ + Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}, + Limits: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}, + }, + InitContainers: []v1.Container{ + { + RestartPolicy: lo.ToPtr(v1.ContainerRestartPolicyAlways), + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}, + Limits: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}, + }, + }, + }, + }) + podResources := resources.Ceiling(pod) + ExpectResources(podResources.Requests, v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("2"), + v1.ResourceMemory: resource.MustParse("2Gi"), + }) + ExpectResources(podResources.Limits, v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("2"), + v1.ResourceMemory: resource.MustParse("2Gi"), + }) + }) + It("should calculate resource requests when the pod level resources requests is defaulted from limits", func() { + pod := test.Pod(test.PodOptions{ + PodResourceRequirements: v1.ResourceRequirements{ + Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("10"), v1.ResourceMemory: resource.MustParse("10Gi")}, // simulate the API server’s defaulting from limits + Limits: v1.ResourceList{v1.ResourceCPU: resource.MustParse("10"), v1.ResourceMemory: resource.MustParse("10Gi")}, + }, + ResourceRequirements: v1.ResourceRequirements{ + Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}, + Limits: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}, + }, + InitContainers: []v1.Container{ + { + RestartPolicy: lo.ToPtr(v1.ContainerRestartPolicyAlways), + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}, + Limits: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}, + }, + }, + }, + }) + podResources := resources.Ceiling(pod) + ExpectResources(podResources.Requests, v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("10"), + v1.ResourceMemory: resource.MustParse("10Gi"), + }) + ExpectResources(podResources.Limits, v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("10"), + v1.ResourceMemory: resource.MustParse("10Gi"), + }) }) }) }) From f9eb4d7d1189c246c54cc9d141f3c8883aa9beb5 Mon Sep 17 00:00:00 2001 From: Tsubasa Nagasawa Date: Fri, 18 Jul 2025 17:52:40 +0900 Subject: [PATCH 2/2] fix: manually apply API server defaulting logic to pass existing tests Signed-off-by: Tsubasa Nagasawa --- pkg/controllers/provisioning/suite_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/controllers/provisioning/suite_test.go b/pkg/controllers/provisioning/suite_test.go index 451a2ab304..816d56407b 100644 --- a/pkg/controllers/provisioning/suite_test.go +++ b/pkg/controllers/provisioning/suite_test.go @@ -1016,7 +1016,7 @@ var _ = Describe("Provisioning", func() { test.DaemonSetOptions{PodOptions: test.PodOptions{ ResourceRequirements: corev1.ResourceRequirements{ Limits: corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("10000"), corev1.ResourceMemory: resource.MustParse("10000Gi")}, - Requests: corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("1")}, + Requests: corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("10000Gi")}, // simulate the API server’s defaulting from limits }, }}, )) @@ -1063,7 +1063,7 @@ var _ = Describe("Provisioning", func() { { Resources: corev1.ResourceRequirements{ Limits: corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("10000"), corev1.ResourceMemory: resource.MustParse("10000Gi")}, - Requests: corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("1")}, + Requests: corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("10000Gi")}, // simulate the API server’s defaulting from limits }, }, },