Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 30 additions & 23 deletions pkg/controller/jobframework/tas.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,59 +27,66 @@ import (
)

type podSetTopologyRequestBuilder struct {
request *kueue.PodSetTopologyRequest
}

func (p *podSetTopologyRequestBuilder) Build() *kueue.PodSetTopologyRequest {
return p.request
podIndexLabel *string
subGroupIndexLabel *string
subGroupCount *int32
meta *metav1.ObjectMeta
}

func (p *podSetTopologyRequestBuilder) PodIndexLabel(podIndexLabel *string) *podSetTopologyRequestBuilder {
if p.request != nil {
p.request.PodIndexLabel = podIndexLabel
}
p.podIndexLabel = podIndexLabel
return p
}

func (p *podSetTopologyRequestBuilder) SubGroup(subGroupIndexLabel *string, subGroupCount *int32) *podSetTopologyRequestBuilder {
if p.request != nil {
p.request.SubGroupIndexLabel = subGroupIndexLabel
p.request.SubGroupCount = subGroupCount
}
p.subGroupIndexLabel = subGroupIndexLabel
p.subGroupCount = subGroupCount
return p
}

func NewPodSetTopologyRequest(meta *metav1.ObjectMeta) *podSetTopologyRequestBuilder {
psTopologyReq := &kueue.PodSetTopologyRequest{}
requiredValue, requiredFound := meta.Annotations[kueuealpha.PodSetRequiredTopologyAnnotation]
preferredValue, preferredFound := meta.Annotations[kueuealpha.PodSetPreferredTopologyAnnotation]
unconstrained, unconstrainedFound := meta.Annotations[kueuealpha.PodSetUnconstrainedTopologyAnnotation]
return &podSetTopologyRequestBuilder{
meta: meta,
}
}

func (p *podSetTopologyRequestBuilder) Build() (*kueue.PodSetTopologyRequest, error) {
psTopologyReq := kueue.PodSetTopologyRequest{}
requiredValue, requiredFound := p.meta.Annotations[kueuealpha.PodSetRequiredTopologyAnnotation]
preferredValue, preferredFound := p.meta.Annotations[kueuealpha.PodSetPreferredTopologyAnnotation]
unconstrained, unconstrainedFound := p.meta.Annotations[kueuealpha.PodSetUnconstrainedTopologyAnnotation]

sliceRequiredTopologyValue, sliceRequiredTopologyFound := meta.Annotations[kueuealpha.PodSetSliceRequiredTopologyAnnotation]
sliceSizeValue, sliceSizeFound := meta.Annotations[kueuealpha.PodSetSliceSizeAnnotation]
sliceRequiredTopologyValue, sliceRequiredTopologyFound := p.meta.Annotations[kueuealpha.PodSetSliceRequiredTopologyAnnotation]
sliceSizeValue, sliceSizeFound := p.meta.Annotations[kueuealpha.PodSetSliceSizeAnnotation]

switch {
case requiredFound:
psTopologyReq.Required = &requiredValue
case preferredFound:
psTopologyReq.Preferred = &preferredValue
case unconstrainedFound:
unconstrained, _ := strconv.ParseBool(unconstrained)
unconstrained, err := strconv.ParseBool(unconstrained)
if err != nil {
return nil, err
}
psTopologyReq.Unconstrained = &unconstrained
default:
psTopologyReq = nil
return nil, nil
}

if sliceRequiredTopologyFound && sliceSizeFound {
sliceSizeIntValue, err := strconv.ParseInt(sliceSizeValue, 10, 32)
if err != nil {
// silently ignore as it should not happen due to earlier validation in a webhook
return nil, err
} else {
psTopologyReq.PodSetSliceRequiredTopology = &sliceRequiredTopologyValue
psTopologyReq.PodSetSliceSize = ptr.To(int32(sliceSizeIntValue))
}
}

builder := &podSetTopologyRequestBuilder{request: psTopologyReq}
return builder
psTopologyReq.PodIndexLabel = p.podIndexLabel
psTopologyReq.SubGroupCount = p.subGroupCount
psTopologyReq.SubGroupIndexLabel = p.subGroupIndexLabel

return &psTopologyReq, nil
}
6 changes: 5 additions & 1 deletion pkg/controller/jobs/appwrapper/appwrapper_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,13 @@ func (j *AppWrapper) PodSets() ([]kueue.PodSet, error) {
Count: awutils.Replicas(awPodSets[psIndex]),
}
if features.Enabled(features.TopologyAwareScheduling) {
podSets[psIndex].TopologyRequest = jobframework.NewPodSetTopologyRequest(
topologyRequest, err := jobframework.NewPodSetTopologyRequest(
&(podSpecTemplates[psIndex].ObjectMeta)).
PodIndexLabel(podIndexLabel).SubGroup(subGroupIndexLabel, subGroupCount).Build()
if err != nil {
return nil, err
}
podSets[psIndex].TopologyRequest = topologyRequest
}
}
return podSets, nil
Expand Down
6 changes: 5 additions & 1 deletion pkg/controller/jobs/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,13 @@ func (j *Job) PodSets() ([]kueue.PodSet, error) {
MinCount: j.minPodsCount(),
}
if features.Enabled(features.TopologyAwareScheduling) {
podSet.TopologyRequest = jobframework.NewPodSetTopologyRequest(
topologyRequest, err := jobframework.NewPodSetTopologyRequest(
&j.Spec.Template.ObjectMeta).PodIndexLabel(
ptr.To(batchv1.JobCompletionIndexAnnotation)).Build()
if err != nil {
return nil, err
}
podSet.TopologyRequest = topologyRequest
}
return []kueue.PodSet{
podSet,
Expand Down
6 changes: 5 additions & 1 deletion pkg/controller/jobs/jobset/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,15 @@ func (j *JobSet) PodSets() ([]kueue.PodSet, error) {
Count: podsCount(&replicatedJob),
}
if features.Enabled(features.TopologyAwareScheduling) {
podSets[index].TopologyRequest = jobframework.NewPodSetTopologyRequest(
topologyRequest, err := jobframework.NewPodSetTopologyRequest(
&replicatedJob.Template.Spec.Template.ObjectMeta).PodIndexLabel(
ptr.To(batchv1.JobCompletionIndexAnnotation)).SubGroup(
ptr.To(jobsetapi.JobIndexKey),
ptr.To(replicatedJob.Replicas)).Build()
if err != nil {
return nil, err
}
podSets[index].TopologyRequest = topologyRequest
}
}
return podSets, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,13 @@ func (j *KubeflowJob) PodSets() ([]kueue.PodSet, error) {
Count: podsCount(j.KFJobControl.ReplicaSpecs(), replicaType),
}
if features.Enabled(features.TopologyAwareScheduling) {
podSets[index].TopologyRequest = jobframework.NewPodSetTopologyRequest(
topologyRequest, err := jobframework.NewPodSetTopologyRequest(
&j.KFJobControl.ReplicaSpecs()[replicaType].Template.ObjectMeta).PodIndexLabel(
ptr.To(kftraining.ReplicaIndexLabel)).Build()
if err != nil {
return nil, err
}
podSets[index].TopologyRequest = topologyRequest
}
}
return podSets, nil
Expand Down
22 changes: 17 additions & 5 deletions pkg/controller/jobs/leaderworkerset/leaderworkerset_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,18 @@ func (r *Reconciler) createPrebuiltWorkload(ctx context.Context, lws *leaderwork
}

func (r *Reconciler) constructWorkload(lws *leaderworkersetv1.LeaderWorkerSet, workloadName string) (*kueue.Workload, error) {
createdWorkload := podcontroller.NewGroupWorkload(workloadName, lws, r.podSets(lws), r.labelKeysToCopy)
podSets, err := r.podSets(lws)
if err != nil {
return nil, err
}
createdWorkload := podcontroller.NewGroupWorkload(workloadName, lws, podSets, r.labelKeysToCopy)
if err := controllerutil.SetOwnerReference(lws, createdWorkload, r.client.Scheme()); err != nil {
return nil, err
}
return createdWorkload, nil
}

func (r *Reconciler) podSets(lws *leaderworkersetv1.LeaderWorkerSet) []kueue.PodSet {
func (r *Reconciler) podSets(lws *leaderworkersetv1.LeaderWorkerSet) ([]kueue.PodSet, error) {
podSets := make([]kueue.PodSet, 0, 2)

if lws.Spec.LeaderWorkerTemplate.LeaderTemplate != nil {
Expand All @@ -203,8 +207,12 @@ func (r *Reconciler) podSets(lws *leaderworkersetv1.LeaderWorkerSet) []kueue.Pod
},
}
if features.Enabled(features.TopologyAwareScheduling) {
podSet.TopologyRequest = jobframework.NewPodSetTopologyRequest(
topologyRequest, err := jobframework.NewPodSetTopologyRequest(
&lws.Spec.LeaderWorkerTemplate.LeaderTemplate.ObjectMeta).Build()
if err != nil {
return nil, err
}
podSet.TopologyRequest = topologyRequest
}
podSets = append(podSets, podSet)
}
Expand All @@ -228,14 +236,18 @@ func (r *Reconciler) podSets(lws *leaderworkersetv1.LeaderWorkerSet) []kueue.Pod
}

if features.Enabled(features.TopologyAwareScheduling) {
podSet.TopologyRequest = jobframework.NewPodSetTopologyRequest(
topologyRequest, err := jobframework.NewPodSetTopologyRequest(
&lws.Spec.LeaderWorkerTemplate.WorkerTemplate.ObjectMeta).PodIndexLabel(
ptr.To(leaderworkersetv1.WorkerIndexLabelKey)).Build()
if err != nil {
return nil, err
}
podSet.TopologyRequest = topologyRequest
}

podSets = append(podSets, podSet)

return podSets
return podSets, nil
}

func (r *Reconciler) removeOwnerReference(ctx context.Context, lws *leaderworkersetv1.LeaderWorkerSet, wl *kueue.Workload) error {
Expand Down
6 changes: 5 additions & 1 deletion pkg/controller/jobs/mpijob/mpijob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,13 @@ func (j *MPIJob) PodSets() ([]kueue.PodSet, error) {
Count: podsCount(&j.Spec, mpiReplicaType),
}
if features.Enabled(features.TopologyAwareScheduling) {
podSets[index].TopologyRequest = jobframework.NewPodSetTopologyRequest(
topologyRequest, err := jobframework.NewPodSetTopologyRequest(
&j.Spec.MPIReplicaSpecs[mpiReplicaType].Template.ObjectMeta).PodIndexLabel(
ptr.To(kfmpi.ReplicaIndexLabel)).Build()
if err != nil {
return nil, err
}
podSets[index].TopologyRequest = topologyRequest
}
}
return podSets, nil
Expand Down
32 changes: 23 additions & 9 deletions pkg/controller/jobs/pod/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ func (p *Pod) Finished() (message string, success, finished bool) {
// PodSets will build workload podSets corresponding to the job.
func (p *Pod) PodSets() ([]kueue.PodSet, error) {
if !p.isGroup {
return constructPodSets(&p.pod), nil
return constructPodSets(&p.pod)
} else {
return p.constructGroupPodSets()
}
Expand Down Expand Up @@ -642,13 +642,17 @@ func (p *Pod) constructGroupPodSets() ([]kueue.PodSet, error) {
return constructGroupPodSets(p.list.Items)
}

func constructPodSets(p *corev1.Pod) []kueue.PodSet {
return []kueue.PodSet{
constructPodSet(p),
func constructPodSets(p *corev1.Pod) ([]kueue.PodSet, error) {
podSet, err := constructPodSet(p)
if err != nil {
return nil, err
}
return []kueue.PodSet{
podSet,
}, nil
}

func constructPodSet(p *corev1.Pod) kueue.PodSet {
func constructPodSet(p *corev1.Pod) (kueue.PodSet, error) {
podSet := kueue.PodSet{
Name: kueue.DefaultPodSetName,
Count: 1,
Expand All @@ -657,11 +661,15 @@ func constructPodSet(p *corev1.Pod) kueue.PodSet {
},
}
if features.Enabled(features.TopologyAwareScheduling) {
podSet.TopologyRequest = jobframework.NewPodSetTopologyRequest(
topologyRequest, err := jobframework.NewPodSetTopologyRequest(
&p.ObjectMeta).PodIndexLabel(
ptr.To(kueuealpha.PodGroupPodIndexLabel)).Build()
if err != nil {
return kueue.PodSet{}, err
}
podSet.TopologyRequest = topologyRequest
}
return podSet
return podSet, nil
}

func constructGroupPodSetsFast(pods []corev1.Pod, groupTotalCount int) ([]kueue.PodSet, error) {
Expand All @@ -673,7 +681,10 @@ func constructGroupPodSetsFast(pods []corev1.Pod, groupTotalCount int) ([]kueue.
if err != nil {
return nil, fmt.Errorf("failed to calculate pod role hash: %w", err)
}
podSets := constructPodSets(&podInGroup)
podSets, err := constructPodSets(&podInGroup)
if err != nil {
return nil, err
}
podSets[0].Name = kueue.NewPodSetReference(roleHash)
podSets[0].Count = int32(groupTotalCount)
return podSets, nil
Expand Down Expand Up @@ -705,7 +716,10 @@ func constructGroupPodSets(pods []corev1.Pod) ([]kueue.PodSet, error) {
}

if !podRoleFound {
podSet := constructPodSet(&podInGroup)
podSet, err := constructPodSet(&podInGroup)
if err != nil {
return nil, err
}
podSet.Name = kueue.NewPodSetReference(roleHash)

resultPodSets = append(resultPodSets, podSet)
Expand Down
12 changes: 10 additions & 2 deletions pkg/controller/jobs/raycluster/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,12 @@ func (j *RayCluster) PodSets() ([]kueue.PodSet, error) {
}

if features.Enabled(features.TopologyAwareScheduling) {
podSets[0].TopologyRequest = jobframework.NewPodSetTopologyRequest(
topologyRequest, err := jobframework.NewPodSetTopologyRequest(
&j.Spec.HeadGroupSpec.Template.ObjectMeta).Build()
if err != nil {
return nil, err
}
podSets[0].TopologyRequest = topologyRequest
}

// workers
Expand All @@ -133,8 +137,12 @@ func (j *RayCluster) PodSets() ([]kueue.PodSet, error) {
Count: count,
}
if features.Enabled(features.TopologyAwareScheduling) {
podSets[index+1].TopologyRequest = jobframework.NewPodSetTopologyRequest(
topologyRequest, err := jobframework.NewPodSetTopologyRequest(
&wgs.Template.ObjectMeta).Build()
if err != nil {
return nil, err
}
podSets[index+1].TopologyRequest = topologyRequest
}
}
return podSets, nil
Expand Down
18 changes: 15 additions & 3 deletions pkg/controller/jobs/rayjob/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,12 @@ func (j *RayJob) PodSets() ([]kueue.PodSet, error) {
Count: 1,
}
if features.Enabled(features.TopologyAwareScheduling) {
headPodSet.TopologyRequest = jobframework.NewPodSetTopologyRequest(
topologyRequest, err := jobframework.NewPodSetTopologyRequest(
&j.Spec.RayClusterSpec.HeadGroupSpec.Template.ObjectMeta).Build()
if err != nil {
return nil, err
}
headPodSet.TopologyRequest = topologyRequest
}
podSets = append(podSets, headPodSet)

Expand All @@ -142,7 +146,11 @@ func (j *RayJob) PodSets() ([]kueue.PodSet, error) {
Count: count,
}
if features.Enabled(features.TopologyAwareScheduling) {
workerPodSet.TopologyRequest = jobframework.NewPodSetTopologyRequest(&wgs.Template.ObjectMeta).Build()
topologyRequest, err := jobframework.NewPodSetTopologyRequest(&wgs.Template.ObjectMeta).Build()
if err != nil {
return nil, err
}
workerPodSet.TopologyRequest = topologyRequest
}
podSets = append(podSets, workerPodSet)
}
Expand All @@ -158,7 +166,11 @@ func (j *RayJob) PodSets() ([]kueue.PodSet, error) {
// Create the TopologyRequest for the Submitter Job PodSet, based on the annotations
// in rayJob.Spec.SubmitterPodTemplate, which can be specified by the user.
if features.Enabled(features.TopologyAwareScheduling) {
submitterJobPodSet.TopologyRequest = jobframework.NewPodSetTopologyRequest(&submitterJobPodSet.Template.ObjectMeta).Build()
topologyRequest, err := jobframework.NewPodSetTopologyRequest(&submitterJobPodSet.Template.ObjectMeta).Build()
if err != nil {
return nil, err
}
submitterJobPodSet.TopologyRequest = topologyRequest
}
podSets = append(podSets, submitterJobPodSet)
}
Expand Down