diff --git a/pkg/controller/jobframework/tas.go b/pkg/controller/jobframework/tas.go index dedf11ca73b..e921df89fe8 100644 --- a/pkg/controller/jobframework/tas.go +++ b/pkg/controller/jobframework/tas.go @@ -27,36 +27,37 @@ import ( ) type podSetTopologyRequestBuilder struct { - request *kueue.PodSetTopologyRequest -} - -func (p *podSetTopologyRequestBuilder) Build() *kueue.PodSetTopologyRequest { - return p.request + podIndexLabel *string + subGroupIndexLabel *string + subGroupCount *int32 + meta *metav1.ObjectMeta } func (p *podSetTopologyRequestBuilder) PodIndexLabel(podIndexLabel *string) *podSetTopologyRequestBuilder { - if p.request != nil { - p.request.PodIndexLabel = podIndexLabel - } + p.podIndexLabel = podIndexLabel return p } func (p *podSetTopologyRequestBuilder) SubGroup(subGroupIndexLabel *string, subGroupCount *int32) *podSetTopologyRequestBuilder { - if p.request != nil { - p.request.SubGroupIndexLabel = subGroupIndexLabel - p.request.SubGroupCount = subGroupCount - } + p.subGroupIndexLabel = subGroupIndexLabel + p.subGroupCount = subGroupCount return p } func NewPodSetTopologyRequest(meta *metav1.ObjectMeta) *podSetTopologyRequestBuilder { - psTopologyReq := &kueue.PodSetTopologyRequest{} - requiredValue, requiredFound := meta.Annotations[kueuealpha.PodSetRequiredTopologyAnnotation] - preferredValue, preferredFound := meta.Annotations[kueuealpha.PodSetPreferredTopologyAnnotation] - unconstrained, unconstrainedFound := meta.Annotations[kueuealpha.PodSetUnconstrainedTopologyAnnotation] + return &podSetTopologyRequestBuilder{ + meta: meta, + } +} + +func (p *podSetTopologyRequestBuilder) Build() (*kueue.PodSetTopologyRequest, error) { + psTopologyReq := kueue.PodSetTopologyRequest{} + requiredValue, requiredFound := p.meta.Annotations[kueuealpha.PodSetRequiredTopologyAnnotation] + preferredValue, preferredFound := p.meta.Annotations[kueuealpha.PodSetPreferredTopologyAnnotation] + unconstrained, unconstrainedFound := p.meta.Annotations[kueuealpha.PodSetUnconstrainedTopologyAnnotation] - sliceRequiredTopologyValue, sliceRequiredTopologyFound := meta.Annotations[kueuealpha.PodSetSliceRequiredTopologyAnnotation] - sliceSizeValue, sliceSizeFound := meta.Annotations[kueuealpha.PodSetSliceSizeAnnotation] + sliceRequiredTopologyValue, sliceRequiredTopologyFound := p.meta.Annotations[kueuealpha.PodSetSliceRequiredTopologyAnnotation] + sliceSizeValue, sliceSizeFound := p.meta.Annotations[kueuealpha.PodSetSliceSizeAnnotation] switch { case requiredFound: @@ -64,22 +65,28 @@ func NewPodSetTopologyRequest(meta *metav1.ObjectMeta) *podSetTopologyRequestBui case preferredFound: psTopologyReq.Preferred = &preferredValue case unconstrainedFound: - unconstrained, _ := strconv.ParseBool(unconstrained) + unconstrained, err := strconv.ParseBool(unconstrained) + if err != nil { + return nil, err + } psTopologyReq.Unconstrained = &unconstrained default: - psTopologyReq = nil + return nil, nil } if sliceRequiredTopologyFound && sliceSizeFound { sliceSizeIntValue, err := strconv.ParseInt(sliceSizeValue, 10, 32) if err != nil { - // silently ignore as it should not happen due to earlier validation in a webhook + return nil, err } else { psTopologyReq.PodSetSliceRequiredTopology = &sliceRequiredTopologyValue psTopologyReq.PodSetSliceSize = ptr.To(int32(sliceSizeIntValue)) } } - builder := &podSetTopologyRequestBuilder{request: psTopologyReq} - return builder + psTopologyReq.PodIndexLabel = p.podIndexLabel + psTopologyReq.SubGroupCount = p.subGroupCount + psTopologyReq.SubGroupIndexLabel = p.subGroupIndexLabel + + return &psTopologyReq, nil } diff --git a/pkg/controller/jobs/appwrapper/appwrapper_controller.go b/pkg/controller/jobs/appwrapper/appwrapper_controller.go index 2c6300673d9..c3c7c6fc7b0 100644 --- a/pkg/controller/jobs/appwrapper/appwrapper_controller.go +++ b/pkg/controller/jobs/appwrapper/appwrapper_controller.go @@ -147,9 +147,13 @@ func (j *AppWrapper) PodSets() ([]kueue.PodSet, error) { Count: awutils.Replicas(awPodSets[psIndex]), } if features.Enabled(features.TopologyAwareScheduling) { - podSets[psIndex].TopologyRequest = jobframework.NewPodSetTopologyRequest( + topologyRequest, err := jobframework.NewPodSetTopologyRequest( &(podSpecTemplates[psIndex].ObjectMeta)). PodIndexLabel(podIndexLabel).SubGroup(subGroupIndexLabel, subGroupCount).Build() + if err != nil { + return nil, err + } + podSets[psIndex].TopologyRequest = topologyRequest } } return podSets, nil diff --git a/pkg/controller/jobs/job/job_controller.go b/pkg/controller/jobs/job/job_controller.go index 30a0704d928..c8cc2649569 100644 --- a/pkg/controller/jobs/job/job_controller.go +++ b/pkg/controller/jobs/job/job_controller.go @@ -251,9 +251,13 @@ func (j *Job) PodSets() ([]kueue.PodSet, error) { MinCount: j.minPodsCount(), } if features.Enabled(features.TopologyAwareScheduling) { - podSet.TopologyRequest = jobframework.NewPodSetTopologyRequest( + topologyRequest, err := jobframework.NewPodSetTopologyRequest( &j.Spec.Template.ObjectMeta).PodIndexLabel( ptr.To(batchv1.JobCompletionIndexAnnotation)).Build() + if err != nil { + return nil, err + } + podSet.TopologyRequest = topologyRequest } return []kueue.PodSet{ podSet, diff --git a/pkg/controller/jobs/jobset/jobset_controller.go b/pkg/controller/jobs/jobset/jobset_controller.go index b84cad3a0ad..57a44a929d8 100644 --- a/pkg/controller/jobs/jobset/jobset_controller.go +++ b/pkg/controller/jobs/jobset/jobset_controller.go @@ -120,11 +120,15 @@ func (j *JobSet) PodSets() ([]kueue.PodSet, error) { Count: podsCount(&replicatedJob), } if features.Enabled(features.TopologyAwareScheduling) { - podSets[index].TopologyRequest = jobframework.NewPodSetTopologyRequest( + topologyRequest, err := jobframework.NewPodSetTopologyRequest( &replicatedJob.Template.Spec.Template.ObjectMeta).PodIndexLabel( ptr.To(batchv1.JobCompletionIndexAnnotation)).SubGroup( ptr.To(jobsetapi.JobIndexKey), ptr.To(replicatedJob.Replicas)).Build() + if err != nil { + return nil, err + } + podSets[index].TopologyRequest = topologyRequest } } return podSets, nil diff --git a/pkg/controller/jobs/kubeflow/kubeflowjob/kubeflowjob_controller.go b/pkg/controller/jobs/kubeflow/kubeflowjob/kubeflowjob_controller.go index 2cbe58dab52..1e2885068f7 100644 --- a/pkg/controller/jobs/kubeflow/kubeflowjob/kubeflowjob_controller.go +++ b/pkg/controller/jobs/kubeflow/kubeflowjob/kubeflowjob_controller.go @@ -108,9 +108,13 @@ func (j *KubeflowJob) PodSets() ([]kueue.PodSet, error) { Count: podsCount(j.KFJobControl.ReplicaSpecs(), replicaType), } if features.Enabled(features.TopologyAwareScheduling) { - podSets[index].TopologyRequest = jobframework.NewPodSetTopologyRequest( + topologyRequest, err := jobframework.NewPodSetTopologyRequest( &j.KFJobControl.ReplicaSpecs()[replicaType].Template.ObjectMeta).PodIndexLabel( ptr.To(kftraining.ReplicaIndexLabel)).Build() + if err != nil { + return nil, err + } + podSets[index].TopologyRequest = topologyRequest } } return podSets, nil diff --git a/pkg/controller/jobs/leaderworkerset/leaderworkerset_reconciler.go b/pkg/controller/jobs/leaderworkerset/leaderworkerset_reconciler.go index cce69b451ee..872f5528705 100644 --- a/pkg/controller/jobs/leaderworkerset/leaderworkerset_reconciler.go +++ b/pkg/controller/jobs/leaderworkerset/leaderworkerset_reconciler.go @@ -184,14 +184,18 @@ func (r *Reconciler) createPrebuiltWorkload(ctx context.Context, lws *leaderwork } func (r *Reconciler) constructWorkload(lws *leaderworkersetv1.LeaderWorkerSet, workloadName string) (*kueue.Workload, error) { - createdWorkload := podcontroller.NewGroupWorkload(workloadName, lws, r.podSets(lws), r.labelKeysToCopy) + podSets, err := r.podSets(lws) + if err != nil { + return nil, err + } + createdWorkload := podcontroller.NewGroupWorkload(workloadName, lws, podSets, r.labelKeysToCopy) if err := controllerutil.SetOwnerReference(lws, createdWorkload, r.client.Scheme()); err != nil { return nil, err } return createdWorkload, nil } -func (r *Reconciler) podSets(lws *leaderworkersetv1.LeaderWorkerSet) []kueue.PodSet { +func (r *Reconciler) podSets(lws *leaderworkersetv1.LeaderWorkerSet) ([]kueue.PodSet, error) { podSets := make([]kueue.PodSet, 0, 2) if lws.Spec.LeaderWorkerTemplate.LeaderTemplate != nil { @@ -203,8 +207,12 @@ func (r *Reconciler) podSets(lws *leaderworkersetv1.LeaderWorkerSet) []kueue.Pod }, } if features.Enabled(features.TopologyAwareScheduling) { - podSet.TopologyRequest = jobframework.NewPodSetTopologyRequest( + topologyRequest, err := jobframework.NewPodSetTopologyRequest( &lws.Spec.LeaderWorkerTemplate.LeaderTemplate.ObjectMeta).Build() + if err != nil { + return nil, err + } + podSet.TopologyRequest = topologyRequest } podSets = append(podSets, podSet) } @@ -228,14 +236,18 @@ func (r *Reconciler) podSets(lws *leaderworkersetv1.LeaderWorkerSet) []kueue.Pod } if features.Enabled(features.TopologyAwareScheduling) { - podSet.TopologyRequest = jobframework.NewPodSetTopologyRequest( + topologyRequest, err := jobframework.NewPodSetTopologyRequest( &lws.Spec.LeaderWorkerTemplate.WorkerTemplate.ObjectMeta).PodIndexLabel( ptr.To(leaderworkersetv1.WorkerIndexLabelKey)).Build() + if err != nil { + return nil, err + } + podSet.TopologyRequest = topologyRequest } podSets = append(podSets, podSet) - return podSets + return podSets, nil } func (r *Reconciler) removeOwnerReference(ctx context.Context, lws *leaderworkersetv1.LeaderWorkerSet, wl *kueue.Workload) error { diff --git a/pkg/controller/jobs/mpijob/mpijob_controller.go b/pkg/controller/jobs/mpijob/mpijob_controller.go index be604d4985d..c752c310db2 100644 --- a/pkg/controller/jobs/mpijob/mpijob_controller.go +++ b/pkg/controller/jobs/mpijob/mpijob_controller.go @@ -119,9 +119,13 @@ func (j *MPIJob) PodSets() ([]kueue.PodSet, error) { Count: podsCount(&j.Spec, mpiReplicaType), } if features.Enabled(features.TopologyAwareScheduling) { - podSets[index].TopologyRequest = jobframework.NewPodSetTopologyRequest( + topologyRequest, err := jobframework.NewPodSetTopologyRequest( &j.Spec.MPIReplicaSpecs[mpiReplicaType].Template.ObjectMeta).PodIndexLabel( ptr.To(kfmpi.ReplicaIndexLabel)).Build() + if err != nil { + return nil, err + } + podSets[index].TopologyRequest = topologyRequest } } return podSets, nil diff --git a/pkg/controller/jobs/pod/pod_controller.go b/pkg/controller/jobs/pod/pod_controller.go index 3133daa0cae..b3857a2f6ff 100644 --- a/pkg/controller/jobs/pod/pod_controller.go +++ b/pkg/controller/jobs/pod/pod_controller.go @@ -385,7 +385,7 @@ func (p *Pod) Finished() (message string, success, finished bool) { // PodSets will build workload podSets corresponding to the job. func (p *Pod) PodSets() ([]kueue.PodSet, error) { if !p.isGroup { - return constructPodSets(&p.pod), nil + return constructPodSets(&p.pod) } else { return p.constructGroupPodSets() } @@ -642,13 +642,17 @@ func (p *Pod) constructGroupPodSets() ([]kueue.PodSet, error) { return constructGroupPodSets(p.list.Items) } -func constructPodSets(p *corev1.Pod) []kueue.PodSet { - return []kueue.PodSet{ - constructPodSet(p), +func constructPodSets(p *corev1.Pod) ([]kueue.PodSet, error) { + podSet, err := constructPodSet(p) + if err != nil { + return nil, err } + return []kueue.PodSet{ + podSet, + }, nil } -func constructPodSet(p *corev1.Pod) kueue.PodSet { +func constructPodSet(p *corev1.Pod) (kueue.PodSet, error) { podSet := kueue.PodSet{ Name: kueue.DefaultPodSetName, Count: 1, @@ -657,11 +661,15 @@ func constructPodSet(p *corev1.Pod) kueue.PodSet { }, } if features.Enabled(features.TopologyAwareScheduling) { - podSet.TopologyRequest = jobframework.NewPodSetTopologyRequest( + topologyRequest, err := jobframework.NewPodSetTopologyRequest( &p.ObjectMeta).PodIndexLabel( ptr.To(kueuealpha.PodGroupPodIndexLabel)).Build() + if err != nil { + return kueue.PodSet{}, err + } + podSet.TopologyRequest = topologyRequest } - return podSet + return podSet, nil } func constructGroupPodSetsFast(pods []corev1.Pod, groupTotalCount int) ([]kueue.PodSet, error) { @@ -673,7 +681,10 @@ func constructGroupPodSetsFast(pods []corev1.Pod, groupTotalCount int) ([]kueue. if err != nil { return nil, fmt.Errorf("failed to calculate pod role hash: %w", err) } - podSets := constructPodSets(&podInGroup) + podSets, err := constructPodSets(&podInGroup) + if err != nil { + return nil, err + } podSets[0].Name = kueue.NewPodSetReference(roleHash) podSets[0].Count = int32(groupTotalCount) return podSets, nil @@ -705,7 +716,10 @@ func constructGroupPodSets(pods []corev1.Pod) ([]kueue.PodSet, error) { } if !podRoleFound { - podSet := constructPodSet(&podInGroup) + podSet, err := constructPodSet(&podInGroup) + if err != nil { + return nil, err + } podSet.Name = kueue.NewPodSetReference(roleHash) resultPodSets = append(resultPodSets, podSet) diff --git a/pkg/controller/jobs/raycluster/raycluster_controller.go b/pkg/controller/jobs/raycluster/raycluster_controller.go index 3a7dde7b172..baf700805b7 100644 --- a/pkg/controller/jobs/raycluster/raycluster_controller.go +++ b/pkg/controller/jobs/raycluster/raycluster_controller.go @@ -113,8 +113,12 @@ func (j *RayCluster) PodSets() ([]kueue.PodSet, error) { } if features.Enabled(features.TopologyAwareScheduling) { - podSets[0].TopologyRequest = jobframework.NewPodSetTopologyRequest( + topologyRequest, err := jobframework.NewPodSetTopologyRequest( &j.Spec.HeadGroupSpec.Template.ObjectMeta).Build() + if err != nil { + return nil, err + } + podSets[0].TopologyRequest = topologyRequest } // workers @@ -133,8 +137,12 @@ func (j *RayCluster) PodSets() ([]kueue.PodSet, error) { Count: count, } if features.Enabled(features.TopologyAwareScheduling) { - podSets[index+1].TopologyRequest = jobframework.NewPodSetTopologyRequest( + topologyRequest, err := jobframework.NewPodSetTopologyRequest( &wgs.Template.ObjectMeta).Build() + if err != nil { + return nil, err + } + podSets[index+1].TopologyRequest = topologyRequest } } return podSets, nil diff --git a/pkg/controller/jobs/rayjob/rayjob_controller.go b/pkg/controller/jobs/rayjob/rayjob_controller.go index 38cf765cb28..2c2f39fd0d1 100644 --- a/pkg/controller/jobs/rayjob/rayjob_controller.go +++ b/pkg/controller/jobs/rayjob/rayjob_controller.go @@ -121,8 +121,12 @@ func (j *RayJob) PodSets() ([]kueue.PodSet, error) { Count: 1, } if features.Enabled(features.TopologyAwareScheduling) { - headPodSet.TopologyRequest = jobframework.NewPodSetTopologyRequest( + topologyRequest, err := jobframework.NewPodSetTopologyRequest( &j.Spec.RayClusterSpec.HeadGroupSpec.Template.ObjectMeta).Build() + if err != nil { + return nil, err + } + headPodSet.TopologyRequest = topologyRequest } podSets = append(podSets, headPodSet) @@ -142,7 +146,11 @@ func (j *RayJob) PodSets() ([]kueue.PodSet, error) { Count: count, } if features.Enabled(features.TopologyAwareScheduling) { - workerPodSet.TopologyRequest = jobframework.NewPodSetTopologyRequest(&wgs.Template.ObjectMeta).Build() + topologyRequest, err := jobframework.NewPodSetTopologyRequest(&wgs.Template.ObjectMeta).Build() + if err != nil { + return nil, err + } + workerPodSet.TopologyRequest = topologyRequest } podSets = append(podSets, workerPodSet) } @@ -158,7 +166,11 @@ func (j *RayJob) PodSets() ([]kueue.PodSet, error) { // Create the TopologyRequest for the Submitter Job PodSet, based on the annotations // in rayJob.Spec.SubmitterPodTemplate, which can be specified by the user. if features.Enabled(features.TopologyAwareScheduling) { - submitterJobPodSet.TopologyRequest = jobframework.NewPodSetTopologyRequest(&submitterJobPodSet.Template.ObjectMeta).Build() + topologyRequest, err := jobframework.NewPodSetTopologyRequest(&submitterJobPodSet.Template.ObjectMeta).Build() + if err != nil { + return nil, err + } + submitterJobPodSet.TopologyRequest = topologyRequest } podSets = append(podSets, submitterJobPodSet) }