From 6e4184a67d959849d44e73abbcba821ebe1680a7 Mon Sep 17 00:00:00 2001 From: Troy Chiu Date: Thu, 7 Aug 2025 00:29:19 -0700 Subject: [PATCH 01/12] modify batch scheduler interface to support CRD other than RayCluster Signed-off-by: Troy Chiu --- .../volcano/volcano_scheduler.go | 187 +++++++++++++----- 1 file changed, 134 insertions(+), 53 deletions(-) diff --git a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go index 7bc8bda6e76..c772ff33736 100644 --- a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -15,8 +16,8 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" - volcanov1alpha1 "volcano.sh/apis/pkg/apis/batch/v1alpha1" - volcanov1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" + volcanobatchv1alpha1 "volcano.sh/apis/pkg/apis/batch/v1alpha1" + volcanoschedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" schedulerinterface "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/interface" @@ -42,39 +43,52 @@ func (v *VolcanoBatchScheduler) Name() string { return GetPluginName() } -func (v *VolcanoBatchScheduler) DoBatchSchedulingOnSubmission(ctx context.Context, object metav1.Object) error { - app, ok := object.(*rayv1.RayCluster) - if !ok { - return fmt.Errorf("currently only RayCluster is supported, got %T", object) - } - var minMember int32 - var totalResource corev1.ResourceList - if !utils.IsAutoscalingEnabled(&app.Spec) { - minMember = utils.CalculateDesiredReplicas(ctx, app) + 1 - totalResource = utils.CalculateDesiredResources(app) - } else { - minMember = utils.CalculateMinReplicas(app) + 1 - totalResource = utils.CalculateMinResources(app) +func (v *VolcanoBatchScheduler) DoBatchSchedulingOnSubmission(ctx context.Context, object client.Object) error { + switch obj := object.(type) { + case *rayv1.RayCluster: + return v.handleRayCluster(ctx, obj) + case *rayv1.RayJob: + return v.handleRayJob(ctx, obj) + default: + return fmt.Errorf("unsupported object type %T, only RayCluster and RayJob are supported", object) } +} - return v.syncPodGroup(ctx, app, minMember, totalResource) +func getAppPodGroupName(object client.Object) string { + // If the object is a RayCluster created by a RayJob, use the RayJob's name + if labels := object.GetLabels(); labels != nil { + if labels[utils.RayOriginatedFromCRDLabelKey] == utils.RayOriginatedFromCRDLabelValue(utils.RayJobCRD) { + if rayJobName, ok := labels[utils.RayOriginatedFromCRNameLabelKey]; ok { + return fmt.Sprintf("ray-%s-pg", rayJobName) + } + } + } + return fmt.Sprintf("ray-%s-pg", object.GetName()) } -func getAppPodGroupName(app *rayv1.RayCluster) string { - return fmt.Sprintf("ray-%s-pg", app.Name) +// copySchedulingLabels copies scheduling-related labels from source to target labels map. +func (v *VolcanoBatchScheduler) copySchedulingLabels(source client.Object, targetLabels map[string]string) { + if queue, ok := source.GetLabels()[QueueNameLabelKey]; ok { + targetLabels[QueueNameLabelKey] = queue + } + if priorityClassName, ok := source.GetLabels()[utils.RayPriorityClassName]; ok { + targetLabels[utils.RayPriorityClassName] = priorityClassName + } } -func (v *VolcanoBatchScheduler) syncPodGroup(ctx context.Context, app *rayv1.RayCluster, size int32, totalResource corev1.ResourceList) error { +// syncPodGroup ensures a Volcano PodGroup exists/updated for the given object +// with the provided size (MinMember) and total resources. +func (v *VolcanoBatchScheduler) syncPodGroup(ctx context.Context, owner client.Object, size int32, totalResource corev1.ResourceList) error { logger := ctrl.LoggerFrom(ctx).WithName(v.Name()) - podGroupName := getAppPodGroupName(app) - podGroup := volcanov1beta1.PodGroup{} - if err := v.cli.Get(ctx, types.NamespacedName{Namespace: app.Namespace, Name: podGroupName}, &podGroup); err != nil { + podGroupName := getAppPodGroupName(owner) + podGroup := volcanoschedulingv1beta1.PodGroup{} + if err := v.cli.Get(ctx, types.NamespacedName{Namespace: owner.GetNamespace(), Name: podGroupName}, &podGroup); err != nil { if !errors.IsNotFound(err) { return err } - podGroup := createPodGroup(app, podGroupName, size, totalResource) + podGroup := createPodGroup(owner, podGroupName, size, totalResource) if err := v.cli.Create(ctx, &podGroup); err != nil { if errors.IsAlreadyExists(err) { logger.Info("pod group already exists, no need to create") @@ -97,57 +111,124 @@ func (v *VolcanoBatchScheduler) syncPodGroup(ctx context.Context, app *rayv1.Ray return nil } -func createPodGroup( - app *rayv1.RayCluster, - podGroupName string, - size int32, - totalResource corev1.ResourceList, -) volcanov1beta1.PodGroup { - podGroup := volcanov1beta1.PodGroup{ +// calculatePodGroupParams calculates MinMember and MinResources for a RayCluster spec +func (v *VolcanoBatchScheduler) calculatePodGroupParams(ctx context.Context, rayClusterSpec *rayv1.RayClusterSpec) (int32, corev1.ResourceList) { + rayCluster := &rayv1.RayCluster{Spec: *rayClusterSpec} + + if !utils.IsAutoscalingEnabled(rayClusterSpec) { + return utils.CalculateDesiredReplicas(ctx, rayCluster) + 1, utils.CalculateDesiredResources(rayCluster) + } + return utils.CalculateMinReplicas(rayCluster) + 1, utils.CalculateMinResources(rayCluster) +} + +// handleRayCluster calculates the PodGroup MinMember and MinResources for a RayCluster +// and creates/updates the corresponding PodGroup unless the cluster originated from a RayJob. +func (v *VolcanoBatchScheduler) handleRayCluster(ctx context.Context, raycluster *rayv1.RayCluster) error { + // Check if this RayCluster is created by a RayJob, if so, skip PodGroup creation + if crdType, ok := raycluster.Labels[utils.RayOriginatedFromCRDLabelKey]; ok && crdType == utils.RayOriginatedFromCRDLabelValue(utils.RayJobCRD) { + return nil + } + + minMember, totalResource := v.calculatePodGroupParams(ctx, &raycluster.Spec) + + return v.syncPodGroup(ctx, raycluster, minMember, totalResource) +} + +// handleRayJob calculates the PodGroup MinMember and MinResources for a RayJob +// based on its embedded RayCluster spec and creates/updates the corresponding PodGroup. +// Note: We intentionally do NOT include the submitter pod in MinMember since the RayCluster +// may not be ready yet. +func (v *VolcanoBatchScheduler) handleRayJob(ctx context.Context, rayJob *rayv1.RayJob) error { + // For RayJob, we need to calculate resources based on the RayClusterSpec + // Not support using existing RayCluster + if rayJob.Spec.RayClusterSpec == nil { + return fmt.Errorf("RayJob %s/%s does not have RayClusterSpec defined", rayJob.Namespace, rayJob.Name) + } + + minMember, totalResource := v.calculatePodGroupParams(ctx, rayJob.Spec.RayClusterSpec) + + return v.syncPodGroup(ctx, rayJob, minMember, totalResource) +} + +// createPodGroup builds a Volcano PodGroup owned by the provided owner object. +func createPodGroup(owner client.Object, podGroupName string, size int32, totalResource corev1.ResourceList) volcanoschedulingv1beta1.PodGroup { + var ownerRef metav1.OwnerReference + switch obj := owner.(type) { + case *rayv1.RayCluster: + ownerRef = *metav1.NewControllerRef(obj, rayv1.SchemeGroupVersion.WithKind("RayCluster")) + case *rayv1.RayJob: + ownerRef = *metav1.NewControllerRef(obj, rayv1.SchemeGroupVersion.WithKind("RayJob")) + } + + podGroup := volcanoschedulingv1beta1.PodGroup{ ObjectMeta: metav1.ObjectMeta{ - Namespace: app.Namespace, - Name: podGroupName, - OwnerReferences: []metav1.OwnerReference{ - *metav1.NewControllerRef(app, rayv1.SchemeGroupVersion.WithKind("RayCluster")), - }, + Namespace: owner.GetNamespace(), + Name: podGroupName, + OwnerReferences: []metav1.OwnerReference{ownerRef}, }, - Spec: volcanov1beta1.PodGroupSpec{ + Spec: volcanoschedulingv1beta1.PodGroupSpec{ MinMember: size, MinResources: &totalResource, }, - Status: volcanov1beta1.PodGroupStatus{ - Phase: volcanov1beta1.PodGroupPending, + Status: volcanoschedulingv1beta1.PodGroupStatus{ + Phase: volcanoschedulingv1beta1.PodGroupPending, }, } - if queue, ok := app.ObjectMeta.Labels[QueueNameLabelKey]; ok { + if queue, ok := owner.GetLabels()[QueueNameLabelKey]; ok { podGroup.Spec.Queue = queue } - if priorityClassName, ok := app.ObjectMeta.Labels[utils.RayPriorityClassName]; ok { + if priorityClassName, ok := owner.GetLabels()[utils.RayPriorityClassName]; ok { podGroup.Spec.PriorityClassName = priorityClassName } return podGroup } -func (v *VolcanoBatchScheduler) AddMetadataToPod(_ context.Context, app *rayv1.RayCluster, groupName string, pod *corev1.Pod) { - pod.Annotations[volcanov1beta1.KubeGroupNameAnnotationKey] = getAppPodGroupName(app) - pod.Annotations[volcanov1alpha1.TaskSpecKey] = groupName - if queue, ok := app.ObjectMeta.Labels[QueueNameLabelKey]; ok { - pod.Labels[QueueNameLabelKey] = queue - } - if priorityClassName, ok := app.ObjectMeta.Labels[utils.RayPriorityClassName]; ok { - pod.Spec.PriorityClassName = priorityClassName +// AddMetadataToChildResource enriches child resource with metadata necessary to tie it to the scheduler. +// For example, setting labels for queues / priority, and setting schedulerName. +func (v *VolcanoBatchScheduler) AddMetadataToChildResource(ctx context.Context, parent client.Object, groupName string, child client.Object) { + switch parentObj := parent.(type) { + case *rayv1.RayCluster: + v.AddMetadataToPod(ctx, parentObj, groupName, child.(*corev1.Pod)) + case *rayv1.RayJob: + switch childObj := child.(type) { + case *batchv1.Job: + v.addMetadataToSubmitterPod(ctx, parent.(*rayv1.RayJob), groupName, childObj) + } } +} + +func (v *VolcanoBatchScheduler) AddMetadataToPod(_ context.Context, app *rayv1.RayCluster, groupName string, pod *corev1.Pod) { + podGroupName := getAppPodGroupName(app) + + pod.Annotations[volcanoschedulingv1beta1.KubeGroupNameAnnotationKey] = podGroupName + pod.Annotations[volcanobatchv1alpha1.TaskSpecKey] = groupName + + v.copySchedulingLabels(app, pod.Labels) pod.Spec.SchedulerName = v.Name() } -func (v *VolcanoBatchScheduler) AddMetadataToChildResource(_ context.Context, _ metav1.Object, _ metav1.Object, _ string) { +// addMetadataToSubmitterPod sets Volcano-related metadata on the submitter pod. +func (v *VolcanoBatchScheduler) addMetadataToSubmitterPod(_ context.Context, app *rayv1.RayJob, _ string, job *batchv1.Job) { + submitterTemplate := &job.Spec.Template + if submitterTemplate.Labels == nil { + submitterTemplate.Labels = make(map[string]string) + } + if submitterTemplate.Annotations == nil { + submitterTemplate.Annotations = make(map[string]string) + } + + submitterTemplate.Annotations[volcanoschedulingv1beta1.KubeGroupNameAnnotationKey] = getAppPodGroupName(app) + submitterTemplate.Annotations[volcanobatchv1alpha1.TaskSpecKey] = utils.RayNodeSubmitterGroupLabelValue + + v.copySchedulingLabels(app, submitterTemplate.Labels) + submitterTemplate.Spec.SchedulerName = v.Name() } func (vf *VolcanoBatchSchedulerFactory) New(_ context.Context, _ *rest.Config, cli client.Client) (schedulerinterface.BatchScheduler, error) { - if err := volcanov1beta1.AddToScheme(cli.Scheme()); err != nil { + if err := volcanoschedulingv1beta1.AddToScheme(cli.Scheme()); err != nil { return nil, fmt.Errorf("failed to add volcano to scheme with error %w", err) } return &VolcanoBatchScheduler{ @@ -156,9 +237,9 @@ func (vf *VolcanoBatchSchedulerFactory) New(_ context.Context, _ *rest.Config, c } func (vf *VolcanoBatchSchedulerFactory) AddToScheme(scheme *runtime.Scheme) { - utilruntime.Must(volcanov1beta1.AddToScheme(scheme)) + utilruntime.Must(volcanoschedulingv1beta1.AddToScheme(scheme)) } func (vf *VolcanoBatchSchedulerFactory) ConfigureReconciler(b *builder.Builder) *builder.Builder { - return b.Owns(&volcanov1beta1.PodGroup{}) -} + return b.Owns(&volcanoschedulingv1beta1.PodGroup{}) +} \ No newline at end of file From 973fe82244d38ed3f3de8f7b3fd4cdbdadce6b7f Mon Sep 17 00:00:00 2001 From: win5923 Date: Thu, 4 Sep 2025 16:17:07 +0000 Subject: [PATCH 02/12] [Feature] RayJob Volcano integration Signed-off-by: win5923 --- .../ray-job.volcano-scheduler-queue.yaml | 143 +++++++++++ .../volcano/volcano_scheduler.go | 2 +- .../volcano/volcano_scheduler_test.go | 228 ++++++++++++++++++ .../yunikorn/yunikorn_scheduler.go | 213 +++++----------- .../controllers/ray/raycluster_controller.go | 4 +- .../controllers/ray/rayjob_controller.go | 19 ++ 6 files changed, 456 insertions(+), 153 deletions(-) create mode 100644 ray-operator/config/samples/ray-job.volcano-scheduler-queue.yaml diff --git a/ray-operator/config/samples/ray-job.volcano-scheduler-queue.yaml b/ray-operator/config/samples/ray-job.volcano-scheduler-queue.yaml new file mode 100644 index 00000000000..072eb7efe17 --- /dev/null +++ b/ray-operator/config/samples/ray-job.volcano-scheduler-queue.yaml @@ -0,0 +1,143 @@ +apiVersion: scheduling.volcano.sh/v1beta1 +kind: Queue +metadata: + name: kuberay-test-queue +spec: + weight: 1 + capability: + cpu: 4 + memory: 6Gi +--- +apiVersion: ray.io/v1 +kind: RayJob +metadata: + name: rayjob-sample-2 + labels: + ray.io/scheduler-name: volcano + volcano.sh/queue-name: kuberay-test-queue +spec: + entrypoint: python /home/ray/samples/sample_code.py + runtimeEnvYAML: | + pip: + - requests==2.26.0 + - pendulum==2.1.2 + env_vars: + counter_name: "test_counter" + # rayClusterSpec specifies the RayCluster instance to be created by the RayJob controller. + rayClusterSpec: + rayVersion: '2.46.0' # should match the Ray version in the image of the containers + # Ray head pod template + headGroupSpec: + # The `rayStartParams` are used to configure the `ray start` command. + # See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay. + # See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`. + rayStartParams: {} + #pod template + template: + spec: + containers: + - name: ray-head + image: rayproject/ray:2.46.0 + ports: + - containerPort: 6379 + name: gcs-server + - containerPort: 8265 # Ray dashboard + name: dashboard + - containerPort: 10001 + name: client + resources: + limits: + cpu: "1" + memory: "2Gi" + requests: + cpu: "1" + memory: "2Gi" + volumeMounts: + - mountPath: /home/ray/samples + name: code-sample + volumes: + # You set volumes at the Pod level, then mount them into containers inside that Pod + - name: code-sample + configMap: + # Provide the name of the ConfigMap you want to mount. + name: ray-job-code-sample + # An array of keys from the ConfigMap to create as files + items: + - key: sample_code.py + path: sample_code.py + workerGroupSpecs: + # the pod replicas in this group typed worker + - replicas: 2 + minReplicas: 2 + maxReplicas: 2 + # logical group name, for this called small-group, also can be functional + groupName: small-group + # The `rayStartParams` are used to configure the `ray start` command. + # See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay. + # See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`. + rayStartParams: {} + #pod template + template: + spec: + containers: + - name: ray-worker # must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name', or '123-abc' + image: rayproject/ray:2.46.0 + resources: + limits: + cpu: "1" + memory: "1Gi" + requests: + cpu: "1" + memory: "1Gi" + + # SubmitterPodTemplate is the template for the pod that will run the `ray job submit` command against the RayCluster. + # If SubmitterPodTemplate is specified, the first container is assumed to be the submitter container. + # submitterPodTemplate: + # spec: + # restartPolicy: Never + # containers: + # - name: my-custom-rayjob-submitter-pod + # image: rayproject/ray:2.46.0 + # # If Command is not specified, the correct command will be supplied at runtime using the RayJob spec `entrypoint` field. + # # Specifying Command is not recommended. + # # command: ["sh", "-c", "ray job submit --address=http://$RAY_DASHBOARD_ADDRESS --submission-id=$RAY_JOB_SUBMISSION_ID -- echo hello world"] + + +######################Ray code sample################################# +# this sample is from https://docs.ray.io/en/latest/cluster/job-submission.html#quick-start-example +# it is mounted into the container and executed to show the Ray job at work +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: ray-job-code-sample +data: + sample_code.py: | + import ray + import os + import requests + + ray.init() + + @ray.remote + class Counter: + def __init__(self): + # Used to verify runtimeEnv + self.name = os.getenv("counter_name") + assert self.name == "test_counter" + self.counter = 0 + + def inc(self): + self.counter += 1 + + def get_counter(self): + return "{} got {}".format(self.name, self.counter) + + counter = Counter.remote() + + for _ in range(5): + ray.get(counter.inc.remote()) + print(ray.get(counter.get_counter.remote())) + + # Verify that the correct runtime env was used for the job. + assert requests.__version__ == "2.26.0" diff --git a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go index c772ff33736..e578e48b271 100644 --- a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go @@ -242,4 +242,4 @@ func (vf *VolcanoBatchSchedulerFactory) AddToScheme(scheme *runtime.Scheme) { func (vf *VolcanoBatchSchedulerFactory) ConfigureReconciler(b *builder.Builder) *builder.Builder { return b.Owns(&volcanoschedulingv1beta1.PodGroup{}) -} \ No newline at end of file +} diff --git a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler_test.go b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler_test.go index 2e810b34302..e3efa794a7e 100644 --- a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler_test.go +++ b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler_test.go @@ -5,10 +5,13 @@ import ( "testing" "github.com/stretchr/testify/assert" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/utils/ptr" + volcanobatchv1alpha1 "volcano.sh/apis/pkg/apis/batch/v1alpha1" + volcanoschedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" @@ -78,6 +81,86 @@ func createTestRayCluster(numOfHosts int32) rayv1.RayCluster { } } +func createTestRayJob(numOfHosts int32) rayv1.RayJob { + headSpec := corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "ray-head", + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("500m"), + corev1.ResourceMemory: resource.MustParse("512Mi"), + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("256m"), + corev1.ResourceMemory: resource.MustParse("256Mi"), + }, + }, + }, + }, + } + + workerSpec := corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "ray-worker", + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("500m"), + corev1.ResourceMemory: resource.MustParse("512Mi"), + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("256m"), + corev1.ResourceMemory: resource.MustParse("256Mi"), + }, + }, + }, + }, + } + + return rayv1.RayJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "rayjob-sample", + Namespace: "default", + Labels: map[string]string{ + QueueNameLabelKey: "test-queue", + utils.RayPriorityClassName: "high-priority", + }, + }, + Spec: rayv1.RayJobSpec{ + RayClusterSpec: &rayv1.RayClusterSpec{ + HeadGroupSpec: rayv1.HeadGroupSpec{ + Template: corev1.PodTemplateSpec{ + Spec: headSpec, + }, + }, + WorkerGroupSpecs: []rayv1.WorkerGroupSpec{ + { + Template: corev1.PodTemplateSpec{ + Spec: workerSpec, + }, + Replicas: ptr.To[int32](2), + NumOfHosts: numOfHosts, + MinReplicas: ptr.To[int32](1), + MaxReplicas: ptr.To[int32](4), + }, + }, + }, + }, + } +} + +func createTestRayClusterFromRayJob(rayJobName string) rayv1.RayCluster { + cluster := createTestRayCluster(1) + cluster.Name = "raycluster-from-rayjob" + if cluster.Labels == nil { + cluster.Labels = make(map[string]string) + } + cluster.Labels[utils.RayOriginatedFromCRDLabelKey] = utils.RayOriginatedFromCRDLabelValue(utils.RayJobCRD) + cluster.Labels[utils.RayOriginatedFromCRNameLabelKey] = rayJobName + return cluster +} + func TestCreatePodGroup(t *testing.T) { a := assert.New(t) @@ -129,3 +212,148 @@ func TestCreatePodGroup_NumOfHosts2(t *testing.T) { // 2 GPUs * 2 = 4 GPUs a.Equal("4", pg.Spec.MinResources.Name("nvidia.com/gpu", resource.BinarySI).String()) } + +func TestCreatePodGroupForRayJob(t *testing.T) { + a := assert.New(t) + + rayJob := createTestRayJob(1) + + // Create RayCluster from RayJob spec for calculation + rayCluster := &rayv1.RayCluster{ + Spec: *rayJob.Spec.RayClusterSpec, + } + + minMember := utils.CalculateDesiredReplicas(context.Background(), rayCluster) + 1 + totalResource := utils.CalculateDesiredResources(rayCluster) + pg := createPodGroup(&rayJob, getAppPodGroupName(&rayJob), minMember, totalResource) + + a.Equal(rayJob.Namespace, pg.Namespace) + a.Equal("ray-rayjob-sample-pg", pg.Name) + + // Verify owner reference is set to RayJob + a.Len(pg.OwnerReferences, 1) + a.Equal("RayJob", pg.OwnerReferences[0].Kind) + a.Equal(rayJob.Name, pg.OwnerReferences[0].Name) + + // Verify queue and priority class are set from RayJob labels + a.Equal("test-queue", pg.Spec.Queue) + a.Equal("high-priority", pg.Spec.PriorityClassName) + + // 1 head + 2 workers (desired, not min replicas) + a.Equal(int32(3), pg.Spec.MinMember) +} + +func TestAddMetadataToPod(t *testing.T) { + a := assert.New(t) + scheduler := &VolcanoBatchScheduler{} + + t.Run("RayCluster from RayJob", func(_ *testing.T) { + cluster := createTestRayClusterFromRayJob("test-rayjob") + cluster.Labels[QueueNameLabelKey] = "test-queue" + cluster.Labels[utils.RayPriorityClassName] = "high-priority" + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: make(map[string]string), + Annotations: make(map[string]string), + }, + Spec: corev1.PodSpec{}, + } + + scheduler.AddMetadataToPod(context.Background(), &cluster, "worker", pod) + + // Should use RayJob name for pod group name + a.Equal("ray-test-rayjob-pg", pod.Annotations[volcanoschedulingv1beta1.KubeGroupNameAnnotationKey]) + a.Equal("worker", pod.Annotations[volcanobatchv1alpha1.TaskSpecKey]) + a.Equal("volcano", pod.Spec.SchedulerName) + a.Equal("test-queue", pod.Labels[QueueNameLabelKey]) + a.Equal("high-priority", pod.Labels[utils.RayPriorityClassName]) + }) + + t.Run("Normal RayCluster", func(_ *testing.T) { + cluster := createTestRayCluster(1) + cluster.Labels = map[string]string{ + QueueNameLabelKey: "test-queue", + utils.RayPriorityClassName: "high-priority", + } + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: make(map[string]string), + Annotations: make(map[string]string), + }, + Spec: corev1.PodSpec{}, + } + + scheduler.AddMetadataToPod(context.Background(), &cluster, "head", pod) + + a.Equal("ray-raycluster-sample-pg", pod.Annotations[volcanoschedulingv1beta1.KubeGroupNameAnnotationKey]) + a.Equal("head", pod.Annotations[volcanobatchv1alpha1.TaskSpecKey]) + a.Equal("volcano", pod.Spec.SchedulerName) + }) +} + +func TestAddMetadataToSubmitterPod(t *testing.T) { + a := assert.New(t) + scheduler := &VolcanoBatchScheduler{} + + rayJob := createTestRayJob(1) + + job := &batchv1.Job{ + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{}, + Spec: corev1.PodSpec{}, + }, + }, + } + + scheduler.addMetadataToSubmitterPod(context.Background(), &rayJob, "submitter", job) + + // Check annotations + a.Equal("ray-rayjob-sample-pg", job.Spec.Template.Annotations[volcanoschedulingv1beta1.KubeGroupNameAnnotationKey]) + a.Equal(utils.RayNodeSubmitterGroupLabelValue, job.Spec.Template.Annotations[volcanobatchv1alpha1.TaskSpecKey]) + + // Check labels are copied from RayJob + a.Equal("test-queue", job.Spec.Template.Labels[QueueNameLabelKey]) + a.Equal("high-priority", job.Spec.Template.Labels[utils.RayPriorityClassName]) + + // Check scheduler name + a.Equal("volcano", job.Spec.Template.Spec.SchedulerName) +} + +func TestCalculatePodGroupParams(t *testing.T) { + a := assert.New(t) + scheduler := &VolcanoBatchScheduler{} + + t.Run("Autoscaling disabled", func(_ *testing.T) { + cluster := createTestRayCluster(1) + + minMember, totalResource := scheduler.calculatePodGroupParams(context.Background(), &cluster.Spec) + + // 1 head + 2 workers (desired replicas) + a.Equal(int32(3), minMember) + + // 256m * 3 (requests, not limits) + a.Equal("768m", totalResource.Cpu().String()) + + // 256Mi * 3 (requests, not limits) + a.Equal("768Mi", totalResource.Memory().String()) + }) + + t.Run("Autoscaling enabled", func(_ *testing.T) { + cluster := createTestRayCluster(1) + cluster.Spec.EnableInTreeAutoscaling = ptr.To(true) + + minMember, totalResource := scheduler.calculatePodGroupParams(context.Background(), &cluster.Spec) + + // 1 head + 1 worker (min replicas) + a.Equal(int32(2), minMember) + + // 256m * 2 (requests, not limits) + a.Equal("512m", totalResource.Cpu().String()) + + // 256Mi * 2 (requests, not limits) + a.Equal("512Mi", totalResource.Memory().String()) + }) +} diff --git a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go index a7bcfda8d64..1c37511038e 100644 --- a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go @@ -1,184 +1,97 @@ -package yunikorn +package batchscheduler import ( "context" + "fmt" + "sync" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/rest" - ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" - rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" + configapi "github.com/ray-project/kuberay/ray-operator/apis/config/v1alpha1" schedulerinterface "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/interface" - "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" + kaischeduler "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/kai-scheduler" + schedulerplugins "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/scheduler-plugins" + "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/volcano" + "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/yunikorn" ) -const ( - SchedulerName string = "yunikorn" - YuniKornPodApplicationIDLabelName string = "applicationId" - YuniKornPodQueueLabelName string = "queue" - RayApplicationIDLabelName string = "yunikorn.apache.org/app-id" - RayApplicationQueueLabelName string = "yunikorn.apache.org/queue" - YuniKornTaskGroupNameAnnotationName string = "yunikorn.apache.org/task-group-name" - YuniKornTaskGroupsAnnotationName string = "yunikorn.apache.org/task-groups" -) - -type YuniKornScheduler struct{} - -type YuniKornSchedulerFactory struct{} - -func GetPluginName() string { - return SchedulerName -} - -func (y *YuniKornScheduler) Name() string { - return GetPluginName() -} - -func (y *YuniKornScheduler) DoBatchSchedulingOnSubmission(_ context.Context, _ metav1.Object) error { - // yunikorn doesn't require any resources to be created upfront - // this is a no-opt for this implementation - return nil -} - -// propagateTaskGroupsAnnotation is a helper function that propagates the task groups annotation to the child -// if the parent has the task groups annotation, it will be copied to the child -// if the parent doesn't have the task groups annotation, a new one will be created -// TODO: remove the legacy labels, i.e "applicationId" and "queue", directly populate labels -// RayApplicationIDLabelName and RayApplicationQueueLabelName to pod labels. -// Currently we use this function to translate labels "yunikorn.apache.org/app-id" and "yunikorn.apache.org/queue" -// to legacy labels "applicationId" and "queue", this is for the better compatibilities to support older yunikorn -// versions. -func propagateTaskGroupsAnnotation(parent metav1.Object, child metav1.Object) error { - var taskGroupsAnnotationValue string - if parentAnnotations, exist := parent.GetAnnotations()[YuniKornTaskGroupsAnnotationName]; exist && parentAnnotations != "" { - taskGroupsAnnotationValue = parentAnnotations - } - if taskGroupsAnnotationValue == "" { - var err error - taskGroupsAnnotationValue, err = getTaskGroupsAnnotationValue(parent) - if err != nil { - return err - } - } - annotations := child.GetAnnotations() - if annotations == nil { - annotations = make(map[string]string) - } - annotations[YuniKornTaskGroupsAnnotationName] = taskGroupsAnnotationValue - child.SetAnnotations(annotations) - return nil -} - -func populateLabelsFromObject(parent metav1.Object, child metav1.Object, sourceKey string, targetKey string) { - labels := child.GetLabels() - if labels == nil { - labels = make(map[string]string) - } - if parentLabel, exist := parent.GetLabels()[sourceKey]; exist && parentLabel != "" { - labels[targetKey] = parentLabel - } - child.SetLabels(labels) +type SchedulerManager struct { + config *rest.Config + factory schedulerinterface.BatchSchedulerFactory + scheduler schedulerinterface.BatchScheduler + rayConfigs configapi.Configuration + sync.Mutex } -func addSchedulerNameToObject(obj metav1.Object, schedulerName string) { - switch obj := obj.(type) { - case *corev1.Pod: - obj.Spec.SchedulerName = schedulerName - case *corev1.PodTemplateSpec: - obj.Spec.SchedulerName = schedulerName +// NewSchedulerManager maintains a specific scheduler plugin based on config +func NewSchedulerManager(ctx context.Context, rayConfigs configapi.Configuration, config *rest.Config, cli client.Client) (*SchedulerManager, error) { + // init the scheduler factory from config + factory, err := getSchedulerFactory(rayConfigs) + if err != nil { + return nil, err } -} -func getTaskGroupsAnnotationValue(obj metav1.Object) (string, error) { - taskGroups := newTaskGroups() - switch obj := obj.(type) { - case *rayv1.RayCluster: - taskGroups = newTaskGroupsFromRayClusterSpec(&obj.Spec) - case *rayv1.RayJob: - taskGroups = newTaskGroupsFromRayJobSpec(&obj.Spec) - } - taskGroupsAnnotationValue, err := taskGroups.marshal() + scheduler, err := factory.New(ctx, config, cli) if err != nil { - return "", err + return nil, err } - return taskGroupsAnnotationValue, nil -} -func addTaskGroupNameAnnotation(obj metav1.Object, groupName string) { - annotations := obj.GetAnnotations() - if annotations == nil { - annotations = make(map[string]string) + manager := SchedulerManager{ + rayConfigs: rayConfigs, + config: config, + factory: factory, + scheduler: scheduler, } - annotations[YuniKornTaskGroupNameAnnotationName] = groupName - obj.SetAnnotations(annotations) -} -func (y *YuniKornScheduler) isGangSchedulingEnabled(obj metav1.Object) bool { - _, exist := obj.GetLabels()[utils.RayGangSchedulingEnabled] - return exist + return &manager, nil } -// AddMetadataToPod adds essential labels and annotations to the Ray pod -// the yunikorn scheduler needs these labels and annotations in order to do the scheduling properly -func (y *YuniKornScheduler) AddMetadataToPod(ctx context.Context, rayCluster *rayv1.RayCluster, groupName string, pod *corev1.Pod) { - logger := ctrl.LoggerFrom(ctx).WithName(SchedulerName) - // the applicationID and queue name must be provided in the labels - populateLabelsFromObject(rayCluster, pod, RayApplicationIDLabelName, YuniKornPodApplicationIDLabelName) - populateLabelsFromObject(rayCluster, pod, RayApplicationQueueLabelName, YuniKornPodQueueLabelName) - addSchedulerNameToObject(pod, y.Name()) - - // when gang scheduling is enabled, extra annotations need to be added to all pods - if y.isGangSchedulingEnabled(rayCluster) { - // populate the taskGroups info to each pod - err := propagateTaskGroupsAnnotation(rayCluster, pod) - if err != nil { - logger.Error(err, "failed to add gang scheduling related annotations to pod, "+ - "gang scheduling will not be enabled for this workload", - "name", pod.Name, "namespace", pod.Namespace) - return +func getSchedulerFactory(rayConfigs configapi.Configuration) (schedulerinterface.BatchSchedulerFactory, error) { + var factory schedulerinterface.BatchSchedulerFactory + + // when a batch scheduler name is provided + // only support a white list of names, empty value is the default value + // it throws error if an unknown name is provided + if len(rayConfigs.BatchScheduler) > 0 { + switch rayConfigs.BatchScheduler { + case volcano.GetPluginName(): + factory = &volcano.VolcanoBatchSchedulerFactory{} + case yunikorn.GetPluginName(): + factory = &yunikorn.YuniKornSchedulerFactory{} + case kaischeduler.GetPluginName(): + factory = &kaischeduler.KaiSchedulerFactory{} + case schedulerplugins.GetPluginName(): + factory = &schedulerplugins.KubeSchedulerFactory{} + default: + return nil, fmt.Errorf("the scheduler is not supported, name=%s", rayConfigs.BatchScheduler) } - - // set the task group name based on the head or worker group name - // the group name for the head and each of the worker group should be different - pod.Annotations[YuniKornTaskGroupNameAnnotationName] = groupName + } else { + // empty is the default value, when not set + // use DefaultBatchSchedulerFactory, it's a no-opt factory + factory = &schedulerinterface.DefaultBatchSchedulerFactory{} } -} -func (y *YuniKornScheduler) AddMetadataToChildResource(ctx context.Context, parent metav1.Object, child metav1.Object, groupName string) { - logger := ctrl.LoggerFrom(ctx).WithName(SchedulerName) - - populateLabelsFromObject(parent, child, RayApplicationIDLabelName, YuniKornPodApplicationIDLabelName) - populateLabelsFromObject(parent, child, RayApplicationQueueLabelName, YuniKornPodQueueLabelName) - addSchedulerNameToObject(child, y.Name()) - - if y.isGangSchedulingEnabled(parent) { - logger.Info("gang scheduling is enabled, propagating task groups annotation to child", "name", child.GetName(), "namespace", child.GetNamespace()) - - err := propagateTaskGroupsAnnotation(parent, child) - if err != nil { - logger.Error(err, "failed to add gang scheduling related annotations to object, "+ - "gang scheduling will not be enabled for this workload", - "name", child.GetName(), "namespace", child.GetNamespace()) - return - } - if groupName != "" { - addTaskGroupNameAnnotation(child, groupName) - } + // legacy option, if this is enabled, register volcano + // this is for backward compatibility + if rayConfigs.EnableBatchScheduler { + factory = &volcano.VolcanoBatchSchedulerFactory{} } -} -func (yf *YuniKornSchedulerFactory) New(_ context.Context, _ *rest.Config, _ client.Client) (schedulerinterface.BatchScheduler, error) { - return &YuniKornScheduler{}, nil + return factory, nil } -func (yf *YuniKornSchedulerFactory) AddToScheme(_ *runtime.Scheme) { - // No extra scheme needs to be registered +func (batch *SchedulerManager) GetScheduler() (schedulerinterface.BatchScheduler, error) { + return batch.scheduler, nil } -func (yf *YuniKornSchedulerFactory) ConfigureReconciler(b *builder.Builder) *builder.Builder { +func (batch *SchedulerManager) ConfigureReconciler(b *builder.Builder) *builder.Builder { + batch.factory.ConfigureReconciler(b) return b } + +func (batch *SchedulerManager) AddToScheme(scheme *runtime.Scheme) { + batch.factory.AddToScheme(scheme) +} diff --git a/ray-operator/controllers/ray/raycluster_controller.go b/ray-operator/controllers/ray/raycluster_controller.go index 17d6616f039..44a84ba2652 100644 --- a/ray-operator/controllers/ray/raycluster_controller.go +++ b/ray-operator/controllers/ray/raycluster_controller.go @@ -927,7 +927,7 @@ func (r *RayClusterReconciler) createHeadPod(ctx context.Context, instance rayv1 // check if the batch scheduler integration is enabled // call the scheduler plugin if so if r.options.BatchSchedulerManager != nil { - if scheduler, err := r.options.BatchSchedulerManager.GetScheduler(); err == nil { + if scheduler, err := r.options.BatchSchedulerManager.GetSchedulerForCluster(); err == nil { scheduler.AddMetadataToPod(ctx, &instance, utils.RayNodeHeadGroupLabelValue, &pod) } else { return err @@ -950,7 +950,7 @@ func (r *RayClusterReconciler) createWorkerPod(ctx context.Context, instance ray // build the pod then create it pod := r.buildWorkerPod(ctx, instance, worker) if r.options.BatchSchedulerManager != nil { - if scheduler, err := r.options.BatchSchedulerManager.GetScheduler(); err == nil { + if scheduler, err := r.options.BatchSchedulerManager.GetSchedulerForCluster(); err == nil { scheduler.AddMetadataToPod(ctx, &instance, worker.GroupName, &pod) } else { return err diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index fe01233e6ca..6985a1858e6 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -180,6 +180,16 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) break } + if r.options.BatchSchedulerManager != nil { + if scheduler, err := r.options.BatchSchedulerManager.GetScheduler(); err == nil { + if err := scheduler.DoBatchSchedulingOnSubmission(ctx, rayJobInstance); err != nil { + return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err + } + } else { + return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err + } + } + if shouldUpdate := checkActiveDeadlineAndUpdateStatusIfNeeded(ctx, rayJobInstance); shouldUpdate { break } @@ -729,6 +739,15 @@ func (r *RayJobReconciler) createNewK8sJob(ctx context.Context, rayJobInstance * return err } + // Add batch scheduler metadata to submitter job + if r.options.BatchSchedulerManager != nil { + if scheduler, err := r.options.BatchSchedulerManager.GetScheduler(); err == nil { + scheduler.AddMetadataToChildResource(ctx, rayJobInstance, utils.RayNodeSubmitterGroupLabelValue, job) + } else { + logger.Error(err, "Failed to get batch scheduler for adding metadata to submitter job") + } + } + // Create the Kubernetes Job if err := r.Client.Create(ctx, job); err != nil { logger.Error(err, "Failed to create new submitter Kubernetes Job for RayJob") From 2ee7e8a67b894706fa0c12cb9fbd6c3bfe6c0934 Mon Sep 17 00:00:00 2001 From: win5923 Date: Thu, 11 Sep 2025 16:15:00 +0000 Subject: [PATCH 03/12] Modify kai scheduler and sheduler plugins Signed-off-by: win5923 --- .../ray-job.volcano-scheduler-queue.yaml | 40 +--- .../ray/batchscheduler/interface/interface.go | 11 - .../kai-scheduler/kai_scheduler.go | 27 ++- .../kai-scheduler/kai_scheduler_test.go | 24 +-- .../scheduler-plugins/scheduler_plugins.go | 29 ++- .../scheduler_plugins_test.go | 4 +- .../volcano/volcano_scheduler.go | 180 ++++++++--------- .../volcano/volcano_scheduler_test.go | 107 +++------- .../yunikorn/yunikorn_scheduler.go | 191 ++++++++++++------ .../controllers/ray/raycluster_controller.go | 8 +- .../controllers/ray/rayjob_controller.go | 19 -- 11 files changed, 296 insertions(+), 344 deletions(-) diff --git a/ray-operator/config/samples/ray-job.volcano-scheduler-queue.yaml b/ray-operator/config/samples/ray-job.volcano-scheduler-queue.yaml index 072eb7efe17..96b97a3042f 100644 --- a/ray-operator/config/samples/ray-job.volcano-scheduler-queue.yaml +++ b/ray-operator/config/samples/ray-job.volcano-scheduler-queue.yaml @@ -11,7 +11,7 @@ spec: apiVersion: ray.io/v1 kind: RayJob metadata: - name: rayjob-sample-2 + name: rayjob-sample-0 labels: ray.io/scheduler-name: volcano volcano.sh/queue-name: kuberay-test-queue @@ -23,16 +23,10 @@ spec: - pendulum==2.1.2 env_vars: counter_name: "test_counter" - # rayClusterSpec specifies the RayCluster instance to be created by the RayJob controller. rayClusterSpec: - rayVersion: '2.46.0' # should match the Ray version in the image of the containers - # Ray head pod template + rayVersion: '2.46.0' headGroupSpec: - # The `rayStartParams` are used to configure the `ray start` command. - # See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay. - # See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`. rayStartParams: {} - #pod template template: spec: containers: @@ -41,7 +35,7 @@ spec: ports: - containerPort: 6379 name: gcs-server - - containerPort: 8265 # Ray dashboard + - containerPort: 8265 name: dashboard - containerPort: 10001 name: client @@ -56,31 +50,22 @@ spec: - mountPath: /home/ray/samples name: code-sample volumes: - # You set volumes at the Pod level, then mount them into containers inside that Pod - name: code-sample configMap: - # Provide the name of the ConfigMap you want to mount. name: ray-job-code-sample - # An array of keys from the ConfigMap to create as files items: - key: sample_code.py path: sample_code.py workerGroupSpecs: - # the pod replicas in this group typed worker - replicas: 2 minReplicas: 2 maxReplicas: 2 - # logical group name, for this called small-group, also can be functional groupName: small-group - # The `rayStartParams` are used to configure the `ray start` command. - # See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay. - # See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`. rayStartParams: {} - #pod template template: spec: containers: - - name: ray-worker # must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name', or '123-abc' + - name: ray-worker image: rayproject/ray:2.46.0 resources: limits: @@ -89,23 +74,6 @@ spec: requests: cpu: "1" memory: "1Gi" - - # SubmitterPodTemplate is the template for the pod that will run the `ray job submit` command against the RayCluster. - # If SubmitterPodTemplate is specified, the first container is assumed to be the submitter container. - # submitterPodTemplate: - # spec: - # restartPolicy: Never - # containers: - # - name: my-custom-rayjob-submitter-pod - # image: rayproject/ray:2.46.0 - # # If Command is not specified, the correct command will be supplied at runtime using the RayJob spec `entrypoint` field. - # # Specifying Command is not recommended. - # # command: ["sh", "-c", "ray job submit --address=http://$RAY_DASHBOARD_ADDRESS --submission-id=$RAY_JOB_SUBMISSION_ID -- echo hello world"] - - -######################Ray code sample################################# -# this sample is from https://docs.ray.io/en/latest/cluster/job-submission.html#quick-start-example -# it is mounted into the container and executed to show the Ray job at work --- apiVersion: v1 kind: ConfigMap diff --git a/ray-operator/controllers/ray/batchscheduler/interface/interface.go b/ray-operator/controllers/ray/batchscheduler/interface/interface.go index 066ea79f102..8f93a938a39 100644 --- a/ray-operator/controllers/ray/batchscheduler/interface/interface.go +++ b/ray-operator/controllers/ray/batchscheduler/interface/interface.go @@ -3,14 +3,11 @@ package schedulerinterface import ( "context" - corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" - - rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" ) // BatchScheduler manages submitting RayCluster pods to a third-party scheduler. @@ -23,11 +20,6 @@ type BatchScheduler interface { // For most batch schedulers, this results in the creation of a PodGroup. DoBatchSchedulingOnSubmission(ctx context.Context, object metav1.Object) error - // AddMetadataToPod enriches the pod with metadata necessary to tie it to the scheduler. - // For example, setting labels for queues / priority, and setting schedulerName. - // This function will be removed once Rayjob Volcano scheduler integration is completed. - AddMetadataToPod(ctx context.Context, rayCluster *rayv1.RayCluster, groupName string, pod *corev1.Pod) - // AddMetadataToChildResource enriches the child resource (batchv1.Job, rayv1.RayCluster) with metadata necessary to tie it to the scheduler. // For example, setting labels for queues / priority, and setting schedulerName. AddMetadataToChildResource(ctx context.Context, parent metav1.Object, child metav1.Object, groupName string) @@ -63,9 +55,6 @@ func (d *DefaultBatchScheduler) DoBatchSchedulingOnSubmission(_ context.Context, return nil } -func (d *DefaultBatchScheduler) AddMetadataToPod(_ context.Context, _ *rayv1.RayCluster, _ string, _ *corev1.Pod) { -} - func (d *DefaultBatchScheduler) AddMetadataToChildResource(_ context.Context, _ metav1.Object, _ metav1.Object, _ string) { } diff --git a/ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler.go b/ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler.go index 863d24fcf3e..2257127db5d 100644 --- a/ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler.go @@ -18,7 +18,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" - rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" schedulerinterface "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/interface" ) @@ -38,23 +37,33 @@ func (k *KaiScheduler) DoBatchSchedulingOnSubmission(_ context.Context, _ metav1 return nil } -func (k *KaiScheduler) AddMetadataToPod(ctx context.Context, app *rayv1.RayCluster, _ string, pod *corev1.Pod) { +func (k *KaiScheduler) AddMetadataToChildResource(ctx context.Context, parent metav1.Object, child metav1.Object, _ string) { logger := ctrl.LoggerFrom(ctx).WithName("kai-scheduler") - pod.Spec.SchedulerName = k.Name() + addSchedulerNameToObject(child, k.Name()) - queue, ok := app.Labels[QueueLabelName] + parentLabel := parent.GetLabels() + queue, ok := parentLabel[QueueLabelName] if !ok || queue == "" { - logger.Info("Queue label missing from RayCluster; pods will remain pending", + logger.Info("Queue label missing from parent; child will remain pending", "requiredLabel", QueueLabelName) return } - if pod.Labels == nil { - pod.Labels = make(map[string]string) + + childLabels := child.GetLabels() + if childLabels == nil { + childLabels = make(map[string]string) } - pod.Labels[QueueLabelName] = queue + childLabels[QueueLabelName] = queue + child.SetLabels(childLabels) } -func (k *KaiScheduler) AddMetadataToChildResource(_ context.Context, _ metav1.Object, _ metav1.Object, _ string) { +func addSchedulerNameToObject(obj metav1.Object, schedulerName string) { + switch obj := obj.(type) { + case *corev1.Pod: + obj.Spec.SchedulerName = schedulerName + case *corev1.PodTemplateSpec: + obj.Spec.SchedulerName = schedulerName + } } func (kf *KaiSchedulerFactory) New(_ context.Context, _ *rest.Config, _ client.Client) (schedulerinterface.BatchScheduler, error) { diff --git a/ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler_test.go b/ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler_test.go index ed9eaf9549a..6b1213ee6f7 100644 --- a/ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler_test.go +++ b/ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler_test.go @@ -41,7 +41,7 @@ func createTestPod() *corev1.Pod { } } -func TestAddMetadataToPod_WithQueueLabel(t *testing.T) { +func TestAddMetadataToChildResource_WithQueueLabel(t *testing.T) { a := assert.New(t) scheduler := &KaiScheduler{} ctx := context.Background() @@ -52,8 +52,8 @@ func TestAddMetadataToPod_WithQueueLabel(t *testing.T) { }) pod := createTestPod() - // Call AddMetadataToPod - scheduler.AddMetadataToPod(ctx, rayCluster, "test-group", pod) + // Call AddMetadataToChildResource + scheduler.AddMetadataToChildResource(ctx, rayCluster, pod, "test-group") // Assert scheduler name is set to kai-scheduler a.Equal("kai-scheduler", pod.Spec.SchedulerName) @@ -63,7 +63,7 @@ func TestAddMetadataToPod_WithQueueLabel(t *testing.T) { a.Equal("test-queue", pod.Labels[QueueLabelName]) } -func TestAddMetadataToPod_WithoutQueueLabel(t *testing.T) { +func TestAddMetadataToChildResource_WithoutQueueLabel(t *testing.T) { a := assert.New(t) scheduler := &KaiScheduler{} ctx := context.Background() @@ -72,8 +72,8 @@ func TestAddMetadataToPod_WithoutQueueLabel(t *testing.T) { rayCluster := createTestRayCluster(map[string]string{}) pod := createTestPod() - // Call AddMetadataToPod - scheduler.AddMetadataToPod(ctx, rayCluster, "test-group", pod) + // Call AddMetadataToChildResource + scheduler.AddMetadataToChildResource(ctx, rayCluster, pod, "test-group") // Assert scheduler name is still set (always required) a.Equal("kai-scheduler", pod.Spec.SchedulerName) @@ -85,7 +85,7 @@ func TestAddMetadataToPod_WithoutQueueLabel(t *testing.T) { } } -func TestAddMetadataToPod_WithEmptyQueueLabel(t *testing.T) { +func TestAddMetadataToChildResource_WithEmptyQueueLabel(t *testing.T) { a := assert.New(t) scheduler := &KaiScheduler{} ctx := context.Background() @@ -96,8 +96,8 @@ func TestAddMetadataToPod_WithEmptyQueueLabel(t *testing.T) { }) pod := createTestPod() - // Call AddMetadataToPod - scheduler.AddMetadataToPod(ctx, rayCluster, "test-group", pod) + // Call AddMetadataToChildResource + scheduler.AddMetadataToChildResource(ctx, rayCluster, pod, "test-group") // Assert scheduler name is still set a.Equal("kai-scheduler", pod.Spec.SchedulerName) @@ -109,7 +109,7 @@ func TestAddMetadataToPod_WithEmptyQueueLabel(t *testing.T) { } } -func TestAddMetadataToPod_PreservesExistingPodLabels(t *testing.T) { +func TestAddMetadataToChildResource_PreservesExistingPodLabels(t *testing.T) { a := assert.New(t) scheduler := &KaiScheduler{} ctx := context.Background() @@ -126,8 +126,8 @@ func TestAddMetadataToPod_PreservesExistingPodLabels(t *testing.T) { "app": "ray", } - // Call AddMetadataToPod - scheduler.AddMetadataToPod(ctx, rayCluster, "test-group", pod) + // Call AddMetadataToChildResource + scheduler.AddMetadataToChildResource(ctx, rayCluster, pod, "test-group") // Assert scheduler name is set a.Equal("kai-scheduler", pod.Spec.SchedulerName) diff --git a/ray-operator/controllers/ray/batchscheduler/scheduler-plugins/scheduler_plugins.go b/ray-operator/controllers/ray/batchscheduler/scheduler-plugins/scheduler_plugins.go index 7f8cc178531..73b1ec6b65a 100644 --- a/ray-operator/controllers/ray/batchscheduler/scheduler-plugins/scheduler_plugins.go +++ b/ray-operator/controllers/ray/batchscheduler/scheduler-plugins/scheduler_plugins.go @@ -93,21 +93,32 @@ func (k *KubeScheduler) DoBatchSchedulingOnSubmission(ctx context.Context, objec return nil } -// AddMetadataToPod adds essential labels and annotations to the Ray pod +// AddMetadataToPod adds essential labels and annotations to the child resource. // the scheduler needs these labels and annotations in order to do the scheduling properly -func (k *KubeScheduler) AddMetadataToPod(_ context.Context, rayCluster *rayv1.RayCluster, _ string, pod *corev1.Pod) { - // when gang scheduling is enabled, extra labels need to be added to all pods - if k.isGangSchedulingEnabled(rayCluster) { - pod.Labels[kubeSchedulerPodGroupLabelKey] = rayCluster.Name +func (k *KubeScheduler) AddMetadataToChildResource(_ context.Context, parent metav1.Object, child metav1.Object, _ string) { + // when gang scheduling is enabled, extra labels need to be added to all child resources + if k.isGangSchedulingEnabled(parent) { + labels := child.GetLabels() + if labels == nil { + labels = make(map[string]string) + } + labels[kubeSchedulerPodGroupLabelKey] = parent.GetName() + child.SetLabels(labels) } - pod.Spec.SchedulerName = k.Name() + addSchedulerNameToObject(child, k.Name()) } -func (k *KubeScheduler) AddMetadataToChildResource(_ context.Context, _ metav1.Object, _ metav1.Object, _ string) { +func addSchedulerNameToObject(obj metav1.Object, schedulerName string) { + switch obj := obj.(type) { + case *corev1.Pod: + obj.Spec.SchedulerName = schedulerName + case *corev1.PodTemplateSpec: + obj.Spec.SchedulerName = schedulerName + } } -func (k *KubeScheduler) isGangSchedulingEnabled(app *rayv1.RayCluster) bool { - _, exist := app.Labels[utils.RayGangSchedulingEnabled] +func (k *KubeScheduler) isGangSchedulingEnabled(obj metav1.Object) bool { + _, exist := obj.GetLabels()[utils.RayGangSchedulingEnabled] return exist } diff --git a/ray-operator/controllers/ray/batchscheduler/scheduler-plugins/scheduler_plugins_test.go b/ray-operator/controllers/ray/batchscheduler/scheduler-plugins/scheduler_plugins_test.go index f5f58977467..e2201b6cf71 100644 --- a/ray-operator/controllers/ray/batchscheduler/scheduler-plugins/scheduler_plugins_test.go +++ b/ray-operator/controllers/ray/batchscheduler/scheduler-plugins/scheduler_plugins_test.go @@ -117,7 +117,7 @@ func TestCreatePodGroupWithMultipleHosts(t *testing.T) { a.Equal(int32(5), podGroup.Spec.MinMember) } -func TestAddMetadataToPod(t *testing.T) { +func TestAddMetadataToChildResource(t *testing.T) { tests := []struct { name string enableGang bool @@ -150,7 +150,7 @@ func TestAddMetadataToPod(t *testing.T) { } scheduler := &KubeScheduler{} - scheduler.AddMetadataToPod(context.TODO(), &cluster, "worker", pod) + scheduler.AddMetadataToChildResource(context.TODO(), &cluster, pod, "worker") if tt.enableGang { a.Equal(cluster.Name, pod.Labels[kubeSchedulerPodGroupLabelKey]) diff --git a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go index e578e48b271..eb0b3b8eafa 100644 --- a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go @@ -4,7 +4,6 @@ import ( "context" "fmt" - batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -26,6 +25,7 @@ import ( const ( PodGroupName = "podgroups.scheduling.volcano.sh" + pluginName = "volcano" QueueNameLabelKey = "volcano.sh/queue-name" ) @@ -35,15 +35,13 @@ type VolcanoBatchScheduler struct { type VolcanoBatchSchedulerFactory struct{} -func GetPluginName() string { - return "volcano" -} +func GetPluginName() string { return pluginName } func (v *VolcanoBatchScheduler) Name() string { return GetPluginName() } -func (v *VolcanoBatchScheduler) DoBatchSchedulingOnSubmission(ctx context.Context, object client.Object) error { +func (v *VolcanoBatchScheduler) DoBatchSchedulingOnSubmission(ctx context.Context, object metav1.Object) error { switch obj := object.(type) { case *rayv1.RayCluster: return v.handleRayCluster(ctx, obj) @@ -54,56 +52,109 @@ func (v *VolcanoBatchScheduler) DoBatchSchedulingOnSubmission(ctx context.Contex } } -func getAppPodGroupName(object client.Object) string { - // If the object is a RayCluster created by a RayJob, use the RayJob's name - if labels := object.GetLabels(); labels != nil { - if labels[utils.RayOriginatedFromCRDLabelKey] == utils.RayOriginatedFromCRDLabelValue(utils.RayJobCRD) { - if rayJobName, ok := labels[utils.RayOriginatedFromCRNameLabelKey]; ok { - return fmt.Sprintf("ray-%s-pg", rayJobName) - } +// handleRayCluster calculates the PodGroup MinMember and MinResources for a RayCluster +func (v *VolcanoBatchScheduler) handleRayCluster(ctx context.Context, raycluster *rayv1.RayCluster) error { + // Check if this RayCluster is created by a RayJob, if so, skip PodGroup creation + if crdType, ok := raycluster.Labels[utils.RayOriginatedFromCRDLabelKey]; ok && crdType == utils.RayOriginatedFromCRDLabelValue(utils.RayJobCRD) { + return nil + } + + minMember, totalResource := v.calculatePodGroupParams(ctx, &raycluster.Spec) + + return v.syncPodGroup(ctx, raycluster, minMember, totalResource) +} + +// handleRayJob calculates the PodGroup MinMember and MinResources for a RayJob +// The submitter pod is intentionally excluded from MinMember calculation. +// Including it before the RayCluster is ready may prevent the PodGroup from +// ever meeting the MinMember requirement, leaving all pods stuck in Pending. +func (v *VolcanoBatchScheduler) handleRayJob(ctx context.Context, rayJob *rayv1.RayJob) error { + if rayJob.Spec.RayClusterSpec == nil { + return fmt.Errorf("gang scheduling does not support RayJob %s/%s referencing an existing RayCluster", rayJob.Namespace, rayJob.Name) + } + + minMember, totalResource := v.calculatePodGroupParams(ctx, rayJob.Spec.RayClusterSpec) + + return v.syncPodGroup(ctx, rayJob, minMember, totalResource) +} + +func getAppPodGroupName(object metav1.Object) string { + // Prefer the RayJob name if this object originated from a RayJob + name := object.GetName() + if labels := object.GetLabels(); labels != nil && + labels[utils.RayOriginatedFromCRDLabelKey] == utils.RayOriginatedFromCRDLabelValue(utils.RayJobCRD) { + if rayJobName := labels[utils.RayOriginatedFromCRNameLabelKey]; rayJobName != "" { + name = rayJobName } } - return fmt.Sprintf("ray-%s-pg", object.GetName()) + return fmt.Sprintf("ray-%s-pg", name) } -// copySchedulingLabels copies scheduling-related labels from source to target labels map. -func (v *VolcanoBatchScheduler) copySchedulingLabels(source client.Object, targetLabels map[string]string) { - if queue, ok := source.GetLabels()[QueueNameLabelKey]; ok { - targetLabels[QueueNameLabelKey] = queue +func addSchedulerName(obj metav1.Object, schedulerName string) { + switch obj := obj.(type) { + case *corev1.Pod: + obj.Spec.SchedulerName = schedulerName + case *corev1.PodTemplateSpec: + obj.Spec.SchedulerName = schedulerName } - if priorityClassName, ok := source.GetLabels()[utils.RayPriorityClassName]; ok { - targetLabels[utils.RayPriorityClassName] = priorityClassName +} + +func populateAnnotations(parent metav1.Object, child metav1.Object, groupName string) { + annotations := child.GetAnnotations() + if annotations == nil { + annotations = make(map[string]string) } + annotations[volcanoschedulingv1beta1.KubeGroupNameAnnotationKey] = getAppPodGroupName(parent) + + switch child.(type) { + case *corev1.Pod: + annotations[volcanobatchv1alpha1.TaskSpecKey] = groupName + case *corev1.PodTemplateSpec: + annotations[volcanobatchv1alpha1.TaskSpecKey] = utils.RayNodeSubmitterGroupLabelValue + } + child.SetAnnotations(annotations) +} + +func populateLabelsFromObject(parent metav1.Object, child metav1.Object, key string) { + labels := child.GetLabels() + if labels == nil { + labels = make(map[string]string) + } + if parentLabel, exist := parent.GetLabels()[key]; exist && parentLabel != "" { + labels[key] = parentLabel + } + child.SetLabels(labels) } // syncPodGroup ensures a Volcano PodGroup exists/updated for the given object // with the provided size (MinMember) and total resources. -func (v *VolcanoBatchScheduler) syncPodGroup(ctx context.Context, owner client.Object, size int32, totalResource corev1.ResourceList) error { - logger := ctrl.LoggerFrom(ctx).WithName(v.Name()) +func (v *VolcanoBatchScheduler) syncPodGroup(ctx context.Context, owner metav1.Object, size int32, totalResource corev1.ResourceList) error { + logger := ctrl.LoggerFrom(ctx).WithName(pluginName) podGroupName := getAppPodGroupName(owner) podGroup := volcanoschedulingv1beta1.PodGroup{} if err := v.cli.Get(ctx, types.NamespacedName{Namespace: owner.GetNamespace(), Name: podGroupName}, &podGroup); err != nil { if !errors.IsNotFound(err) { + logger.Error(err, "failed to get PodGroup", "name", podGroupName) return err } podGroup := createPodGroup(owner, podGroupName, size, totalResource) if err := v.cli.Create(ctx, &podGroup); err != nil { if errors.IsAlreadyExists(err) { - logger.Info("pod group already exists, no need to create") + logger.Info("podGroup already exists, no need to create", "name", podGroupName) return nil } - logger.Error(err, "Pod group CREATE error!", "PodGroup.Error", err) + logger.Error(err, "failed to create PodGroup", "name", podGroupName) return err } } else { - if podGroup.Spec.MinMember != size || !quotav1.Equals(*podGroup.Spec.MinResources, totalResource) { + if podGroup.Spec.MinMember != size || podGroup.Spec.MinResources == nil || !quotav1.Equals(*podGroup.Spec.MinResources, totalResource) { podGroup.Spec.MinMember = size podGroup.Spec.MinResources = &totalResource if err := v.cli.Update(ctx, &podGroup); err != nil { - logger.Error(err, "Pod group UPDATE error!", "podGroup", podGroupName) + logger.Error(err, "failed to update PodGroup", "name", podGroupName) return err } } @@ -111,7 +162,6 @@ func (v *VolcanoBatchScheduler) syncPodGroup(ctx context.Context, owner client.O return nil } -// calculatePodGroupParams calculates MinMember and MinResources for a RayCluster spec func (v *VolcanoBatchScheduler) calculatePodGroupParams(ctx context.Context, rayClusterSpec *rayv1.RayClusterSpec) (int32, corev1.ResourceList) { rayCluster := &rayv1.RayCluster{Spec: *rayClusterSpec} @@ -121,37 +171,7 @@ func (v *VolcanoBatchScheduler) calculatePodGroupParams(ctx context.Context, ray return utils.CalculateMinReplicas(rayCluster) + 1, utils.CalculateMinResources(rayCluster) } -// handleRayCluster calculates the PodGroup MinMember and MinResources for a RayCluster -// and creates/updates the corresponding PodGroup unless the cluster originated from a RayJob. -func (v *VolcanoBatchScheduler) handleRayCluster(ctx context.Context, raycluster *rayv1.RayCluster) error { - // Check if this RayCluster is created by a RayJob, if so, skip PodGroup creation - if crdType, ok := raycluster.Labels[utils.RayOriginatedFromCRDLabelKey]; ok && crdType == utils.RayOriginatedFromCRDLabelValue(utils.RayJobCRD) { - return nil - } - - minMember, totalResource := v.calculatePodGroupParams(ctx, &raycluster.Spec) - - return v.syncPodGroup(ctx, raycluster, minMember, totalResource) -} - -// handleRayJob calculates the PodGroup MinMember and MinResources for a RayJob -// based on its embedded RayCluster spec and creates/updates the corresponding PodGroup. -// Note: We intentionally do NOT include the submitter pod in MinMember since the RayCluster -// may not be ready yet. -func (v *VolcanoBatchScheduler) handleRayJob(ctx context.Context, rayJob *rayv1.RayJob) error { - // For RayJob, we need to calculate resources based on the RayClusterSpec - // Not support using existing RayCluster - if rayJob.Spec.RayClusterSpec == nil { - return fmt.Errorf("RayJob %s/%s does not have RayClusterSpec defined", rayJob.Namespace, rayJob.Name) - } - - minMember, totalResource := v.calculatePodGroupParams(ctx, rayJob.Spec.RayClusterSpec) - - return v.syncPodGroup(ctx, rayJob, minMember, totalResource) -} - -// createPodGroup builds a Volcano PodGroup owned by the provided owner object. -func createPodGroup(owner client.Object, podGroupName string, size int32, totalResource corev1.ResourceList) volcanoschedulingv1beta1.PodGroup { +func createPodGroup(owner metav1.Object, podGroupName string, size int32, totalResource corev1.ResourceList) volcanoschedulingv1beta1.PodGroup { var ownerRef metav1.OwnerReference switch obj := owner.(type) { case *rayv1.RayCluster: @@ -175,10 +195,10 @@ func createPodGroup(owner client.Object, podGroupName string, size int32, totalR }, } + // Copy scheduling labels to PodGroup spec if queue, ok := owner.GetLabels()[QueueNameLabelKey]; ok { podGroup.Spec.Queue = queue } - if priorityClassName, ok := owner.GetLabels()[utils.RayPriorityClassName]; ok { podGroup.Spec.PriorityClassName = priorityClassName } @@ -186,45 +206,11 @@ func createPodGroup(owner client.Object, podGroupName string, size int32, totalR return podGroup } -// AddMetadataToChildResource enriches child resource with metadata necessary to tie it to the scheduler. -// For example, setting labels for queues / priority, and setting schedulerName. -func (v *VolcanoBatchScheduler) AddMetadataToChildResource(ctx context.Context, parent client.Object, groupName string, child client.Object) { - switch parentObj := parent.(type) { - case *rayv1.RayCluster: - v.AddMetadataToPod(ctx, parentObj, groupName, child.(*corev1.Pod)) - case *rayv1.RayJob: - switch childObj := child.(type) { - case *batchv1.Job: - v.addMetadataToSubmitterPod(ctx, parent.(*rayv1.RayJob), groupName, childObj) - } - } -} - -func (v *VolcanoBatchScheduler) AddMetadataToPod(_ context.Context, app *rayv1.RayCluster, groupName string, pod *corev1.Pod) { - podGroupName := getAppPodGroupName(app) - - pod.Annotations[volcanoschedulingv1beta1.KubeGroupNameAnnotationKey] = podGroupName - pod.Annotations[volcanobatchv1alpha1.TaskSpecKey] = groupName - - v.copySchedulingLabels(app, pod.Labels) - pod.Spec.SchedulerName = v.Name() -} - -// addMetadataToSubmitterPod sets Volcano-related metadata on the submitter pod. -func (v *VolcanoBatchScheduler) addMetadataToSubmitterPod(_ context.Context, app *rayv1.RayJob, _ string, job *batchv1.Job) { - submitterTemplate := &job.Spec.Template - if submitterTemplate.Labels == nil { - submitterTemplate.Labels = make(map[string]string) - } - if submitterTemplate.Annotations == nil { - submitterTemplate.Annotations = make(map[string]string) - } - - submitterTemplate.Annotations[volcanoschedulingv1beta1.KubeGroupNameAnnotationKey] = getAppPodGroupName(app) - submitterTemplate.Annotations[volcanobatchv1alpha1.TaskSpecKey] = utils.RayNodeSubmitterGroupLabelValue - - v.copySchedulingLabels(app, submitterTemplate.Labels) - submitterTemplate.Spec.SchedulerName = v.Name() +func (v *VolcanoBatchScheduler) AddMetadataToChildResource(_ context.Context, parent metav1.Object, child metav1.Object, groupName string) { + populateLabelsFromObject(parent, child, QueueNameLabelKey) + populateLabelsFromObject(parent, child, utils.RayPriorityClassName) + populateAnnotations(parent, child, groupName) + addSchedulerName(child, v.Name()) } func (vf *VolcanoBatchSchedulerFactory) New(_ context.Context, _ *rest.Config, cli client.Client) (schedulerinterface.BatchScheduler, error) { diff --git a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler_test.go b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler_test.go index e3efa794a7e..134af5e0371 100644 --- a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler_test.go +++ b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler_test.go @@ -5,7 +5,6 @@ import ( "testing" "github.com/stretchr/testify/assert" - batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -14,6 +13,7 @@ import ( volcanoschedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" + "github.com/ray-project/kuberay/ray-operator/controllers/ray/common" "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" ) @@ -150,18 +150,7 @@ func createTestRayJob(numOfHosts int32) rayv1.RayJob { } } -func createTestRayClusterFromRayJob(rayJobName string) rayv1.RayCluster { - cluster := createTestRayCluster(1) - cluster.Name = "raycluster-from-rayjob" - if cluster.Labels == nil { - cluster.Labels = make(map[string]string) - } - cluster.Labels[utils.RayOriginatedFromCRDLabelKey] = utils.RayOriginatedFromCRDLabelValue(utils.RayJobCRD) - cluster.Labels[utils.RayOriginatedFromCRNameLabelKey] = rayJobName - return cluster -} - -func TestCreatePodGroup(t *testing.T) { +func TestCreatePodGroupForRayCluster(t *testing.T) { a := assert.New(t) cluster := createTestRayCluster(1) @@ -185,7 +174,7 @@ func TestCreatePodGroup(t *testing.T) { a.Equal("2", pg.Spec.MinResources.Name("nvidia.com/gpu", resource.BinarySI).String()) } -func TestCreatePodGroup_NumOfHosts2(t *testing.T) { +func TestCreatePodGroupForRayCluster_NumOfHosts2(t *testing.T) { a := assert.New(t) cluster := createTestRayCluster(2) @@ -243,83 +232,31 @@ func TestCreatePodGroupForRayJob(t *testing.T) { a.Equal(int32(3), pg.Spec.MinMember) } -func TestAddMetadataToPod(t *testing.T) { - a := assert.New(t) - scheduler := &VolcanoBatchScheduler{} - - t.Run("RayCluster from RayJob", func(_ *testing.T) { - cluster := createTestRayClusterFromRayJob("test-rayjob") - cluster.Labels[QueueNameLabelKey] = "test-queue" - cluster.Labels[utils.RayPriorityClassName] = "high-priority" - - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Labels: make(map[string]string), - Annotations: make(map[string]string), - }, - Spec: corev1.PodSpec{}, - } - - scheduler.AddMetadataToPod(context.Background(), &cluster, "worker", pod) - - // Should use RayJob name for pod group name - a.Equal("ray-test-rayjob-pg", pod.Annotations[volcanoschedulingv1beta1.KubeGroupNameAnnotationKey]) - a.Equal("worker", pod.Annotations[volcanobatchv1alpha1.TaskSpecKey]) - a.Equal("volcano", pod.Spec.SchedulerName) - a.Equal("test-queue", pod.Labels[QueueNameLabelKey]) - a.Equal("high-priority", pod.Labels[utils.RayPriorityClassName]) - }) - - t.Run("Normal RayCluster", func(_ *testing.T) { - cluster := createTestRayCluster(1) - cluster.Labels = map[string]string{ - QueueNameLabelKey: "test-queue", - utils.RayPriorityClassName: "high-priority", - } - - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Labels: make(map[string]string), - Annotations: make(map[string]string), - }, - Spec: corev1.PodSpec{}, - } - - scheduler.AddMetadataToPod(context.Background(), &cluster, "head", pod) - - a.Equal("ray-raycluster-sample-pg", pod.Annotations[volcanoschedulingv1beta1.KubeGroupNameAnnotationKey]) - a.Equal("head", pod.Annotations[volcanobatchv1alpha1.TaskSpecKey]) - a.Equal("volcano", pod.Spec.SchedulerName) - }) -} - func TestAddMetadataToSubmitterPod(t *testing.T) { a := assert.New(t) scheduler := &VolcanoBatchScheduler{} rayJob := createTestRayJob(1) + rayCluster := &rayv1.RayCluster{Spec: *rayJob.Spec.RayClusterSpec} + submitterTemplate := common.GetSubmitterTemplate(&rayJob.Spec, &rayCluster.Spec) - job := &batchv1.Job{ - Spec: batchv1.JobSpec{ - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{}, - Spec: corev1.PodSpec{}, - }, - }, - } - - scheduler.addMetadataToSubmitterPod(context.Background(), &rayJob, "submitter", job) + scheduler.AddMetadataToChildResource( + context.Background(), + &rayJob, + &submitterTemplate, + utils.RayNodeSubmitterGroupLabelValue, + ) // Check annotations - a.Equal("ray-rayjob-sample-pg", job.Spec.Template.Annotations[volcanoschedulingv1beta1.KubeGroupNameAnnotationKey]) - a.Equal(utils.RayNodeSubmitterGroupLabelValue, job.Spec.Template.Annotations[volcanobatchv1alpha1.TaskSpecKey]) + a.Equal(getAppPodGroupName(&rayJob), submitterTemplate.Annotations[volcanoschedulingv1beta1.KubeGroupNameAnnotationKey]) + a.Equal(utils.RayNodeSubmitterGroupLabelValue, submitterTemplate.Annotations[volcanobatchv1alpha1.TaskSpecKey]) - // Check labels are copied from RayJob - a.Equal("test-queue", job.Spec.Template.Labels[QueueNameLabelKey]) - a.Equal("high-priority", job.Spec.Template.Labels[utils.RayPriorityClassName]) + // Check labels + a.Equal("test-queue", submitterTemplate.Labels[QueueNameLabelKey]) + a.Equal("high-priority", submitterTemplate.Labels[utils.RayPriorityClassName]) // Check scheduler name - a.Equal("volcano", job.Spec.Template.Spec.SchedulerName) + a.Equal(pluginName, submitterTemplate.Spec.SchedulerName) } func TestCalculatePodGroupParams(t *testing.T) { @@ -357,3 +294,13 @@ func TestCalculatePodGroupParams(t *testing.T) { a.Equal("512Mi", totalResource.Memory().String()) }) } + +func TestGetAppPodGroupName(t *testing.T) { + a := assert.New(t) + + rayCluster := &rayv1.RayCluster{ObjectMeta: metav1.ObjectMeta{Name: "raycluster-sample", Namespace: "default"}} + a.Equal("ray-raycluster-sample-pg", getAppPodGroupName(rayCluster)) + + rayJob := createTestRayJob(1) + a.Equal("ray-rayjob-sample-pg", getAppPodGroupName(&rayJob)) +} diff --git a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go index 1c37511038e..37cb5a5c78b 100644 --- a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go @@ -1,97 +1,158 @@ -package batchscheduler +package yunikorn import ( "context" - "fmt" - "sync" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/rest" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" - configapi "github.com/ray-project/kuberay/ray-operator/apis/config/v1alpha1" + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" schedulerinterface "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/interface" - kaischeduler "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/kai-scheduler" - schedulerplugins "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/scheduler-plugins" - "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/volcano" - "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/yunikorn" + "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" ) -type SchedulerManager struct { - config *rest.Config - factory schedulerinterface.BatchSchedulerFactory - scheduler schedulerinterface.BatchScheduler - rayConfigs configapi.Configuration - sync.Mutex +const ( + SchedulerName string = "yunikorn" + YuniKornPodApplicationIDLabelName string = "applicationId" + YuniKornPodQueueLabelName string = "queue" + RayApplicationIDLabelName string = "yunikorn.apache.org/app-id" + RayApplicationQueueLabelName string = "yunikorn.apache.org/queue" + YuniKornTaskGroupNameAnnotationName string = "yunikorn.apache.org/task-group-name" + YuniKornTaskGroupsAnnotationName string = "yunikorn.apache.org/task-groups" +) + +type YuniKornScheduler struct{} + +type YuniKornSchedulerFactory struct{} + +func GetPluginName() string { + return SchedulerName } -// NewSchedulerManager maintains a specific scheduler plugin based on config -func NewSchedulerManager(ctx context.Context, rayConfigs configapi.Configuration, config *rest.Config, cli client.Client) (*SchedulerManager, error) { - // init the scheduler factory from config - factory, err := getSchedulerFactory(rayConfigs) - if err != nil { - return nil, err - } +func (y *YuniKornScheduler) Name() string { + return GetPluginName() +} - scheduler, err := factory.New(ctx, config, cli) - if err != nil { - return nil, err +func (y *YuniKornScheduler) DoBatchSchedulingOnSubmission(_ context.Context, _ metav1.Object) error { + // yunikorn doesn't require any resources to be created upfront + // this is a no-opt for this implementation + return nil +} + +// propagateTaskGroupsAnnotation is a helper function that propagates the task groups annotation to the child +// if the parent has the task groups annotation, it will be copied to the child +// if the parent doesn't have the task groups annotation, a new one will be created +// TODO: remove the legacy labels, i.e "applicationId" and "queue", directly populate labels +// RayApplicationIDLabelName and RayApplicationQueueLabelName to pod labels. +// Currently we use this function to translate labels "yunikorn.apache.org/app-id" and "yunikorn.apache.org/queue" +// to legacy labels "applicationId" and "queue", this is for the better compatibilities to support older yunikorn +// versions. +func propagateTaskGroupsAnnotation(parent metav1.Object, child metav1.Object) error { + var taskGroupsAnnotationValue string + if parentAnnotations, exist := parent.GetAnnotations()[YuniKornTaskGroupsAnnotationName]; exist && parentAnnotations != "" { + taskGroupsAnnotationValue = parentAnnotations + } + if taskGroupsAnnotationValue == "" { + var err error + taskGroupsAnnotationValue, err = getTaskGroupsAnnotationValue(parent) + if err != nil { + return err + } } + annotations := child.GetAnnotations() + if annotations == nil { + annotations = make(map[string]string) + } + annotations[YuniKornTaskGroupsAnnotationName] = taskGroupsAnnotationValue + child.SetAnnotations(annotations) + return nil +} - manager := SchedulerManager{ - rayConfigs: rayConfigs, - config: config, - factory: factory, - scheduler: scheduler, +func populateLabelsFromObject(parent metav1.Object, child metav1.Object, sourceKey string, targetKey string) { + labels := child.GetLabels() + if labels == nil { + labels = make(map[string]string) } + if parentLabel, exist := parent.GetLabels()[sourceKey]; exist && parentLabel != "" { + labels[targetKey] = parentLabel + } + child.SetLabels(labels) +} - return &manager, nil +func addSchedulerNameToObject(obj metav1.Object, schedulerName string) { + switch obj := obj.(type) { + case *corev1.Pod: + obj.Spec.SchedulerName = schedulerName + case *corev1.PodTemplateSpec: + obj.Spec.SchedulerName = schedulerName + } } -func getSchedulerFactory(rayConfigs configapi.Configuration) (schedulerinterface.BatchSchedulerFactory, error) { - var factory schedulerinterface.BatchSchedulerFactory - - // when a batch scheduler name is provided - // only support a white list of names, empty value is the default value - // it throws error if an unknown name is provided - if len(rayConfigs.BatchScheduler) > 0 { - switch rayConfigs.BatchScheduler { - case volcano.GetPluginName(): - factory = &volcano.VolcanoBatchSchedulerFactory{} - case yunikorn.GetPluginName(): - factory = &yunikorn.YuniKornSchedulerFactory{} - case kaischeduler.GetPluginName(): - factory = &kaischeduler.KaiSchedulerFactory{} - case schedulerplugins.GetPluginName(): - factory = &schedulerplugins.KubeSchedulerFactory{} - default: - return nil, fmt.Errorf("the scheduler is not supported, name=%s", rayConfigs.BatchScheduler) - } - } else { - // empty is the default value, when not set - // use DefaultBatchSchedulerFactory, it's a no-opt factory - factory = &schedulerinterface.DefaultBatchSchedulerFactory{} +func getTaskGroupsAnnotationValue(obj metav1.Object) (string, error) { + taskGroups := newTaskGroups() + switch obj := obj.(type) { + case *rayv1.RayCluster: + taskGroups = newTaskGroupsFromRayClusterSpec(&obj.Spec) + case *rayv1.RayJob: + taskGroups = newTaskGroupsFromRayJobSpec(&obj.Spec) + } + taskGroupsAnnotationValue, err := taskGroups.marshal() + if err != nil { + return "", err } + return taskGroupsAnnotationValue, nil +} - // legacy option, if this is enabled, register volcano - // this is for backward compatibility - if rayConfigs.EnableBatchScheduler { - factory = &volcano.VolcanoBatchSchedulerFactory{} +func addTaskGroupNameAnnotation(obj metav1.Object, groupName string) { + annotations := obj.GetAnnotations() + if annotations == nil { + annotations = make(map[string]string) } + annotations[YuniKornTaskGroupNameAnnotationName] = groupName + obj.SetAnnotations(annotations) +} + +func (y *YuniKornScheduler) isGangSchedulingEnabled(obj metav1.Object) bool { + _, exist := obj.GetLabels()[utils.RayGangSchedulingEnabled] + return exist +} - return factory, nil +func (y *YuniKornScheduler) AddMetadataToChildResource(ctx context.Context, parent metav1.Object, child metav1.Object, groupName string) { + logger := ctrl.LoggerFrom(ctx).WithName(SchedulerName) + + populateLabelsFromObject(parent, child, RayApplicationIDLabelName, YuniKornPodApplicationIDLabelName) + populateLabelsFromObject(parent, child, RayApplicationQueueLabelName, YuniKornPodQueueLabelName) + addSchedulerNameToObject(child, y.Name()) + + if y.isGangSchedulingEnabled(parent) { + logger.Info("gang scheduling is enabled, propagating task groups annotation to child", "name", child.GetName(), "namespace", child.GetNamespace()) + + err := propagateTaskGroupsAnnotation(parent, child) + if err != nil { + logger.Error(err, "failed to add gang scheduling related annotations to object, "+ + "gang scheduling will not be enabled for this workload", + "name", child.GetName(), "namespace", child.GetNamespace()) + return + } + if groupName != "" { + addTaskGroupNameAnnotation(child, groupName) + } + } } -func (batch *SchedulerManager) GetScheduler() (schedulerinterface.BatchScheduler, error) { - return batch.scheduler, nil +func (yf *YuniKornSchedulerFactory) New(_ context.Context, _ *rest.Config, _ client.Client) (schedulerinterface.BatchScheduler, error) { + return &YuniKornScheduler{}, nil } -func (batch *SchedulerManager) ConfigureReconciler(b *builder.Builder) *builder.Builder { - batch.factory.ConfigureReconciler(b) - return b +func (yf *YuniKornSchedulerFactory) AddToScheme(_ *runtime.Scheme) { + // No extra scheme needs to be registered } -func (batch *SchedulerManager) AddToScheme(scheme *runtime.Scheme) { - batch.factory.AddToScheme(scheme) +func (yf *YuniKornSchedulerFactory) ConfigureReconciler(b *builder.Builder) *builder.Builder { + return b } diff --git a/ray-operator/controllers/ray/raycluster_controller.go b/ray-operator/controllers/ray/raycluster_controller.go index 44a84ba2652..65d561c7eb0 100644 --- a/ray-operator/controllers/ray/raycluster_controller.go +++ b/ray-operator/controllers/ray/raycluster_controller.go @@ -927,8 +927,8 @@ func (r *RayClusterReconciler) createHeadPod(ctx context.Context, instance rayv1 // check if the batch scheduler integration is enabled // call the scheduler plugin if so if r.options.BatchSchedulerManager != nil { - if scheduler, err := r.options.BatchSchedulerManager.GetSchedulerForCluster(); err == nil { - scheduler.AddMetadataToPod(ctx, &instance, utils.RayNodeHeadGroupLabelValue, &pod) + if scheduler, err := r.options.BatchSchedulerManager.GetScheduler(); err == nil { + scheduler.AddMetadataToChildResource(ctx, &instance, &pod, utils.RayNodeHeadGroupLabelValue) } else { return err } @@ -950,8 +950,8 @@ func (r *RayClusterReconciler) createWorkerPod(ctx context.Context, instance ray // build the pod then create it pod := r.buildWorkerPod(ctx, instance, worker) if r.options.BatchSchedulerManager != nil { - if scheduler, err := r.options.BatchSchedulerManager.GetSchedulerForCluster(); err == nil { - scheduler.AddMetadataToPod(ctx, &instance, worker.GroupName, &pod) + if scheduler, err := r.options.BatchSchedulerManager.GetScheduler(); err == nil { + scheduler.AddMetadataToChildResource(ctx, &instance, &pod, worker.GroupName) } else { return err } diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index 6985a1858e6..fe01233e6ca 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -180,16 +180,6 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) break } - if r.options.BatchSchedulerManager != nil { - if scheduler, err := r.options.BatchSchedulerManager.GetScheduler(); err == nil { - if err := scheduler.DoBatchSchedulingOnSubmission(ctx, rayJobInstance); err != nil { - return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err - } - } else { - return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err - } - } - if shouldUpdate := checkActiveDeadlineAndUpdateStatusIfNeeded(ctx, rayJobInstance); shouldUpdate { break } @@ -739,15 +729,6 @@ func (r *RayJobReconciler) createNewK8sJob(ctx context.Context, rayJobInstance * return err } - // Add batch scheduler metadata to submitter job - if r.options.BatchSchedulerManager != nil { - if scheduler, err := r.options.BatchSchedulerManager.GetScheduler(); err == nil { - scheduler.AddMetadataToChildResource(ctx, rayJobInstance, utils.RayNodeSubmitterGroupLabelValue, job) - } else { - logger.Error(err, "Failed to get batch scheduler for adding metadata to submitter job") - } - } - // Create the Kubernetes Job if err := r.Client.Create(ctx, job); err != nil { logger.Error(err, "Failed to create new submitter Kubernetes Job for RayJob") From 92e7d955ab07d302f726c0172518d29ca2eae190 Mon Sep 17 00:00:00 2001 From: win5923 Date: Wed, 24 Sep 2025 14:29:13 +0000 Subject: [PATCH 04/12] Revert interface migration Signed-off-by: win5923 --- .../ray/batchscheduler/interface/interface.go | 11 +++++++ .../kai-scheduler/kai_scheduler.go | 27 ++++++----------- .../kai-scheduler/kai_scheduler_test.go | 24 +++++++-------- .../scheduler-plugins/scheduler_plugins.go | 29 ++++++------------- .../scheduler_plugins_test.go | 4 +-- .../volcano/volcano_scheduler.go | 13 +++++++++ .../yunikorn/yunikorn_scheduler.go | 26 +++++++++++++++++ .../controllers/ray/raycluster_controller.go | 4 +-- 8 files changed, 84 insertions(+), 54 deletions(-) diff --git a/ray-operator/controllers/ray/batchscheduler/interface/interface.go b/ray-operator/controllers/ray/batchscheduler/interface/interface.go index 8f93a938a39..066ea79f102 100644 --- a/ray-operator/controllers/ray/batchscheduler/interface/interface.go +++ b/ray-operator/controllers/ray/batchscheduler/interface/interface.go @@ -3,11 +3,14 @@ package schedulerinterface import ( "context" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" ) // BatchScheduler manages submitting RayCluster pods to a third-party scheduler. @@ -20,6 +23,11 @@ type BatchScheduler interface { // For most batch schedulers, this results in the creation of a PodGroup. DoBatchSchedulingOnSubmission(ctx context.Context, object metav1.Object) error + // AddMetadataToPod enriches the pod with metadata necessary to tie it to the scheduler. + // For example, setting labels for queues / priority, and setting schedulerName. + // This function will be removed once Rayjob Volcano scheduler integration is completed. + AddMetadataToPod(ctx context.Context, rayCluster *rayv1.RayCluster, groupName string, pod *corev1.Pod) + // AddMetadataToChildResource enriches the child resource (batchv1.Job, rayv1.RayCluster) with metadata necessary to tie it to the scheduler. // For example, setting labels for queues / priority, and setting schedulerName. AddMetadataToChildResource(ctx context.Context, parent metav1.Object, child metav1.Object, groupName string) @@ -55,6 +63,9 @@ func (d *DefaultBatchScheduler) DoBatchSchedulingOnSubmission(_ context.Context, return nil } +func (d *DefaultBatchScheduler) AddMetadataToPod(_ context.Context, _ *rayv1.RayCluster, _ string, _ *corev1.Pod) { +} + func (d *DefaultBatchScheduler) AddMetadataToChildResource(_ context.Context, _ metav1.Object, _ metav1.Object, _ string) { } diff --git a/ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler.go b/ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler.go index 2257127db5d..863d24fcf3e 100644 --- a/ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler.go @@ -18,6 +18,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" schedulerinterface "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/interface" ) @@ -37,33 +38,23 @@ func (k *KaiScheduler) DoBatchSchedulingOnSubmission(_ context.Context, _ metav1 return nil } -func (k *KaiScheduler) AddMetadataToChildResource(ctx context.Context, parent metav1.Object, child metav1.Object, _ string) { +func (k *KaiScheduler) AddMetadataToPod(ctx context.Context, app *rayv1.RayCluster, _ string, pod *corev1.Pod) { logger := ctrl.LoggerFrom(ctx).WithName("kai-scheduler") - addSchedulerNameToObject(child, k.Name()) + pod.Spec.SchedulerName = k.Name() - parentLabel := parent.GetLabels() - queue, ok := parentLabel[QueueLabelName] + queue, ok := app.Labels[QueueLabelName] if !ok || queue == "" { - logger.Info("Queue label missing from parent; child will remain pending", + logger.Info("Queue label missing from RayCluster; pods will remain pending", "requiredLabel", QueueLabelName) return } - - childLabels := child.GetLabels() - if childLabels == nil { - childLabels = make(map[string]string) + if pod.Labels == nil { + pod.Labels = make(map[string]string) } - childLabels[QueueLabelName] = queue - child.SetLabels(childLabels) + pod.Labels[QueueLabelName] = queue } -func addSchedulerNameToObject(obj metav1.Object, schedulerName string) { - switch obj := obj.(type) { - case *corev1.Pod: - obj.Spec.SchedulerName = schedulerName - case *corev1.PodTemplateSpec: - obj.Spec.SchedulerName = schedulerName - } +func (k *KaiScheduler) AddMetadataToChildResource(_ context.Context, _ metav1.Object, _ metav1.Object, _ string) { } func (kf *KaiSchedulerFactory) New(_ context.Context, _ *rest.Config, _ client.Client) (schedulerinterface.BatchScheduler, error) { diff --git a/ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler_test.go b/ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler_test.go index 6b1213ee6f7..ed9eaf9549a 100644 --- a/ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler_test.go +++ b/ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler_test.go @@ -41,7 +41,7 @@ func createTestPod() *corev1.Pod { } } -func TestAddMetadataToChildResource_WithQueueLabel(t *testing.T) { +func TestAddMetadataToPod_WithQueueLabel(t *testing.T) { a := assert.New(t) scheduler := &KaiScheduler{} ctx := context.Background() @@ -52,8 +52,8 @@ func TestAddMetadataToChildResource_WithQueueLabel(t *testing.T) { }) pod := createTestPod() - // Call AddMetadataToChildResource - scheduler.AddMetadataToChildResource(ctx, rayCluster, pod, "test-group") + // Call AddMetadataToPod + scheduler.AddMetadataToPod(ctx, rayCluster, "test-group", pod) // Assert scheduler name is set to kai-scheduler a.Equal("kai-scheduler", pod.Spec.SchedulerName) @@ -63,7 +63,7 @@ func TestAddMetadataToChildResource_WithQueueLabel(t *testing.T) { a.Equal("test-queue", pod.Labels[QueueLabelName]) } -func TestAddMetadataToChildResource_WithoutQueueLabel(t *testing.T) { +func TestAddMetadataToPod_WithoutQueueLabel(t *testing.T) { a := assert.New(t) scheduler := &KaiScheduler{} ctx := context.Background() @@ -72,8 +72,8 @@ func TestAddMetadataToChildResource_WithoutQueueLabel(t *testing.T) { rayCluster := createTestRayCluster(map[string]string{}) pod := createTestPod() - // Call AddMetadataToChildResource - scheduler.AddMetadataToChildResource(ctx, rayCluster, pod, "test-group") + // Call AddMetadataToPod + scheduler.AddMetadataToPod(ctx, rayCluster, "test-group", pod) // Assert scheduler name is still set (always required) a.Equal("kai-scheduler", pod.Spec.SchedulerName) @@ -85,7 +85,7 @@ func TestAddMetadataToChildResource_WithoutQueueLabel(t *testing.T) { } } -func TestAddMetadataToChildResource_WithEmptyQueueLabel(t *testing.T) { +func TestAddMetadataToPod_WithEmptyQueueLabel(t *testing.T) { a := assert.New(t) scheduler := &KaiScheduler{} ctx := context.Background() @@ -96,8 +96,8 @@ func TestAddMetadataToChildResource_WithEmptyQueueLabel(t *testing.T) { }) pod := createTestPod() - // Call AddMetadataToChildResource - scheduler.AddMetadataToChildResource(ctx, rayCluster, pod, "test-group") + // Call AddMetadataToPod + scheduler.AddMetadataToPod(ctx, rayCluster, "test-group", pod) // Assert scheduler name is still set a.Equal("kai-scheduler", pod.Spec.SchedulerName) @@ -109,7 +109,7 @@ func TestAddMetadataToChildResource_WithEmptyQueueLabel(t *testing.T) { } } -func TestAddMetadataToChildResource_PreservesExistingPodLabels(t *testing.T) { +func TestAddMetadataToPod_PreservesExistingPodLabels(t *testing.T) { a := assert.New(t) scheduler := &KaiScheduler{} ctx := context.Background() @@ -126,8 +126,8 @@ func TestAddMetadataToChildResource_PreservesExistingPodLabels(t *testing.T) { "app": "ray", } - // Call AddMetadataToChildResource - scheduler.AddMetadataToChildResource(ctx, rayCluster, pod, "test-group") + // Call AddMetadataToPod + scheduler.AddMetadataToPod(ctx, rayCluster, "test-group", pod) // Assert scheduler name is set a.Equal("kai-scheduler", pod.Spec.SchedulerName) diff --git a/ray-operator/controllers/ray/batchscheduler/scheduler-plugins/scheduler_plugins.go b/ray-operator/controllers/ray/batchscheduler/scheduler-plugins/scheduler_plugins.go index 73b1ec6b65a..7f8cc178531 100644 --- a/ray-operator/controllers/ray/batchscheduler/scheduler-plugins/scheduler_plugins.go +++ b/ray-operator/controllers/ray/batchscheduler/scheduler-plugins/scheduler_plugins.go @@ -93,32 +93,21 @@ func (k *KubeScheduler) DoBatchSchedulingOnSubmission(ctx context.Context, objec return nil } -// AddMetadataToPod adds essential labels and annotations to the child resource. +// AddMetadataToPod adds essential labels and annotations to the Ray pod // the scheduler needs these labels and annotations in order to do the scheduling properly -func (k *KubeScheduler) AddMetadataToChildResource(_ context.Context, parent metav1.Object, child metav1.Object, _ string) { - // when gang scheduling is enabled, extra labels need to be added to all child resources - if k.isGangSchedulingEnabled(parent) { - labels := child.GetLabels() - if labels == nil { - labels = make(map[string]string) - } - labels[kubeSchedulerPodGroupLabelKey] = parent.GetName() - child.SetLabels(labels) +func (k *KubeScheduler) AddMetadataToPod(_ context.Context, rayCluster *rayv1.RayCluster, _ string, pod *corev1.Pod) { + // when gang scheduling is enabled, extra labels need to be added to all pods + if k.isGangSchedulingEnabled(rayCluster) { + pod.Labels[kubeSchedulerPodGroupLabelKey] = rayCluster.Name } - addSchedulerNameToObject(child, k.Name()) + pod.Spec.SchedulerName = k.Name() } -func addSchedulerNameToObject(obj metav1.Object, schedulerName string) { - switch obj := obj.(type) { - case *corev1.Pod: - obj.Spec.SchedulerName = schedulerName - case *corev1.PodTemplateSpec: - obj.Spec.SchedulerName = schedulerName - } +func (k *KubeScheduler) AddMetadataToChildResource(_ context.Context, _ metav1.Object, _ metav1.Object, _ string) { } -func (k *KubeScheduler) isGangSchedulingEnabled(obj metav1.Object) bool { - _, exist := obj.GetLabels()[utils.RayGangSchedulingEnabled] +func (k *KubeScheduler) isGangSchedulingEnabled(app *rayv1.RayCluster) bool { + _, exist := app.Labels[utils.RayGangSchedulingEnabled] return exist } diff --git a/ray-operator/controllers/ray/batchscheduler/scheduler-plugins/scheduler_plugins_test.go b/ray-operator/controllers/ray/batchscheduler/scheduler-plugins/scheduler_plugins_test.go index e2201b6cf71..f5f58977467 100644 --- a/ray-operator/controllers/ray/batchscheduler/scheduler-plugins/scheduler_plugins_test.go +++ b/ray-operator/controllers/ray/batchscheduler/scheduler-plugins/scheduler_plugins_test.go @@ -117,7 +117,7 @@ func TestCreatePodGroupWithMultipleHosts(t *testing.T) { a.Equal(int32(5), podGroup.Spec.MinMember) } -func TestAddMetadataToChildResource(t *testing.T) { +func TestAddMetadataToPod(t *testing.T) { tests := []struct { name string enableGang bool @@ -150,7 +150,7 @@ func TestAddMetadataToChildResource(t *testing.T) { } scheduler := &KubeScheduler{} - scheduler.AddMetadataToChildResource(context.TODO(), &cluster, pod, "worker") + scheduler.AddMetadataToPod(context.TODO(), &cluster, "worker", pod) if tt.enableGang { a.Equal(cluster.Name, pod.Labels[kubeSchedulerPodGroupLabelKey]) diff --git a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go index eb0b3b8eafa..08af3107576 100644 --- a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go @@ -213,6 +213,19 @@ func (v *VolcanoBatchScheduler) AddMetadataToChildResource(_ context.Context, pa addSchedulerName(child, v.Name()) } +// This function will be removed in interface migration PR +func (v *VolcanoBatchScheduler) AddMetadataToPod(_ context.Context, app *rayv1.RayCluster, groupName string, pod *corev1.Pod) { + pod.Annotations[volcanoschedulingv1beta1.KubeGroupNameAnnotationKey] = getAppPodGroupName(app) + pod.Annotations[volcanobatchv1alpha1.TaskSpecKey] = groupName + if queue, ok := app.ObjectMeta.Labels[QueueNameLabelKey]; ok { + pod.Labels[QueueNameLabelKey] = queue + } + if priorityClassName, ok := app.ObjectMeta.Labels[utils.RayPriorityClassName]; ok { + pod.Spec.PriorityClassName = priorityClassName + } + pod.Spec.SchedulerName = v.Name() +} + func (vf *VolcanoBatchSchedulerFactory) New(_ context.Context, _ *rest.Config, cli client.Client) (schedulerinterface.BatchScheduler, error) { if err := volcanoschedulingv1beta1.AddToScheme(cli.Scheme()); err != nil { return nil, fmt.Errorf("failed to add volcano to scheme with error %w", err) diff --git a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go index 37cb5a5c78b..a7bcfda8d64 100644 --- a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go @@ -122,6 +122,32 @@ func (y *YuniKornScheduler) isGangSchedulingEnabled(obj metav1.Object) bool { return exist } +// AddMetadataToPod adds essential labels and annotations to the Ray pod +// the yunikorn scheduler needs these labels and annotations in order to do the scheduling properly +func (y *YuniKornScheduler) AddMetadataToPod(ctx context.Context, rayCluster *rayv1.RayCluster, groupName string, pod *corev1.Pod) { + logger := ctrl.LoggerFrom(ctx).WithName(SchedulerName) + // the applicationID and queue name must be provided in the labels + populateLabelsFromObject(rayCluster, pod, RayApplicationIDLabelName, YuniKornPodApplicationIDLabelName) + populateLabelsFromObject(rayCluster, pod, RayApplicationQueueLabelName, YuniKornPodQueueLabelName) + addSchedulerNameToObject(pod, y.Name()) + + // when gang scheduling is enabled, extra annotations need to be added to all pods + if y.isGangSchedulingEnabled(rayCluster) { + // populate the taskGroups info to each pod + err := propagateTaskGroupsAnnotation(rayCluster, pod) + if err != nil { + logger.Error(err, "failed to add gang scheduling related annotations to pod, "+ + "gang scheduling will not be enabled for this workload", + "name", pod.Name, "namespace", pod.Namespace) + return + } + + // set the task group name based on the head or worker group name + // the group name for the head and each of the worker group should be different + pod.Annotations[YuniKornTaskGroupNameAnnotationName] = groupName + } +} + func (y *YuniKornScheduler) AddMetadataToChildResource(ctx context.Context, parent metav1.Object, child metav1.Object, groupName string) { logger := ctrl.LoggerFrom(ctx).WithName(SchedulerName) diff --git a/ray-operator/controllers/ray/raycluster_controller.go b/ray-operator/controllers/ray/raycluster_controller.go index 65d561c7eb0..17d6616f039 100644 --- a/ray-operator/controllers/ray/raycluster_controller.go +++ b/ray-operator/controllers/ray/raycluster_controller.go @@ -928,7 +928,7 @@ func (r *RayClusterReconciler) createHeadPod(ctx context.Context, instance rayv1 // call the scheduler plugin if so if r.options.BatchSchedulerManager != nil { if scheduler, err := r.options.BatchSchedulerManager.GetScheduler(); err == nil { - scheduler.AddMetadataToChildResource(ctx, &instance, &pod, utils.RayNodeHeadGroupLabelValue) + scheduler.AddMetadataToPod(ctx, &instance, utils.RayNodeHeadGroupLabelValue, &pod) } else { return err } @@ -951,7 +951,7 @@ func (r *RayClusterReconciler) createWorkerPod(ctx context.Context, instance ray pod := r.buildWorkerPod(ctx, instance, worker) if r.options.BatchSchedulerManager != nil { if scheduler, err := r.options.BatchSchedulerManager.GetScheduler(); err == nil { - scheduler.AddMetadataToChildResource(ctx, &instance, &pod, worker.GroupName) + scheduler.AddMetadataToPod(ctx, &instance, worker.GroupName, &pod) } else { return err } From dd38daa60cca6904d457eae715fde7fb04b1910b Mon Sep 17 00:00:00 2001 From: win5923 Date: Thu, 25 Sep 2025 15:30:54 +0000 Subject: [PATCH 05/12] Append submitter resources Signed-off-by: win5923 --- .../volcano/volcano_scheduler.go | 18 ++- .../volcano/volcano_scheduler_test.go | 134 +++++++++++++++--- .../controllers/ray/rayjob_controller.go | 2 +- ray-operator/controllers/ray/utils/util.go | 6 +- 4 files changed, 130 insertions(+), 30 deletions(-) diff --git a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go index 08af3107576..638cd2c837a 100644 --- a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go @@ -20,6 +20,7 @@ import ( rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" schedulerinterface "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/interface" + "github.com/ray-project/kuberay/ray-operator/controllers/ray/common" "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" ) @@ -65,17 +66,25 @@ func (v *VolcanoBatchScheduler) handleRayCluster(ctx context.Context, raycluster } // handleRayJob calculates the PodGroup MinMember and MinResources for a RayJob -// The submitter pod is intentionally excluded from MinMember calculation. -// Including it before the RayCluster is ready may prevent the PodGroup from -// ever meeting the MinMember requirement, leaving all pods stuck in Pending. func (v *VolcanoBatchScheduler) handleRayJob(ctx context.Context, rayJob *rayv1.RayJob) error { if rayJob.Spec.RayClusterSpec == nil { return fmt.Errorf("gang scheduling does not support RayJob %s/%s referencing an existing RayCluster", rayJob.Namespace, rayJob.Name) } + totalResourceList := []corev1.ResourceList{{}} minMember, totalResource := v.calculatePodGroupParams(ctx, rayJob.Spec.RayClusterSpec) + totalResourceList = append(totalResourceList, totalResource) - return v.syncPodGroup(ctx, rayJob, minMember, totalResource) + // MinMember intentionally excludes the submitter pod to avoid a startup deadlock + // (submitter waits for cluster; gang would wait for submitter). We still add the + // submitter's resource requests into MinResources so capacity is reserved. + if rayJob.Spec.SubmissionMode == rayv1.K8sJobMode { + submitterTemplate := common.GetSubmitterTemplate(&rayJob.Spec, rayJob.Spec.RayClusterSpec) + submitResource := utils.CalculatePodResource(submitterTemplate.Spec) + totalResourceList = append(totalResourceList, submitResource) + } + + return v.syncPodGroup(ctx, rayJob, minMember, utils.SumResourceList(totalResourceList)) } func getAppPodGroupName(object metav1.Object) string { @@ -195,7 +204,6 @@ func createPodGroup(owner metav1.Object, podGroupName string, size int32, totalR }, } - // Copy scheduling labels to PodGroup spec if queue, ok := owner.GetLabels()[QueueNameLabelKey]; ok { podGroup.Spec.Queue = queue } diff --git a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler_test.go b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler_test.go index 134af5e0371..934df7b7a30 100644 --- a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler_test.go +++ b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler_test.go @@ -5,10 +5,14 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" volcanobatchv1alpha1 "volcano.sh/apis/pkg/apis/batch/v1alpha1" volcanoschedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" @@ -124,7 +128,7 @@ func createTestRayJob(numOfHosts int32) rayv1.RayJob { Namespace: "default", Labels: map[string]string{ QueueNameLabelKey: "test-queue", - utils.RayPriorityClassName: "high-priority", + utils.RayPriorityClassName: "test-priority", }, }, Spec: rayv1.RayJobSpec{ @@ -204,32 +208,120 @@ func TestCreatePodGroupForRayCluster_NumOfHosts2(t *testing.T) { func TestCreatePodGroupForRayJob(t *testing.T) { a := assert.New(t) + ctx := context.Background() - rayJob := createTestRayJob(1) + scheme := runtime.NewScheme() + a.NoError(rayv1.AddToScheme(scheme)) + a.NoError(volcanoschedulingv1beta1.AddToScheme(scheme)) + fakeCli := fake.NewClientBuilder().WithScheme(scheme).Build() + scheduler := &VolcanoBatchScheduler{cli: fakeCli} - // Create RayCluster from RayJob spec for calculation - rayCluster := &rayv1.RayCluster{ - Spec: *rayJob.Spec.RayClusterSpec, - } + t.Run("No submitter pod resources", func(_ *testing.T) { + rayJob := createTestRayJob(1) + rayJob.Spec.SubmissionMode = rayv1.HTTPMode - minMember := utils.CalculateDesiredReplicas(context.Background(), rayCluster) + 1 - totalResource := utils.CalculateDesiredResources(rayCluster) - pg := createPodGroup(&rayJob, getAppPodGroupName(&rayJob), minMember, totalResource) + err := scheduler.handleRayJob(ctx, &rayJob) + require.NoError(t, err) - a.Equal(rayJob.Namespace, pg.Namespace) - a.Equal("ray-rayjob-sample-pg", pg.Name) + var pg volcanoschedulingv1beta1.PodGroup + err = fakeCli.Get(ctx, client.ObjectKey{Namespace: rayJob.Namespace, Name: getAppPodGroupName(&rayJob)}, &pg) + require.NoError(t, err) - // Verify owner reference is set to RayJob - a.Len(pg.OwnerReferences, 1) - a.Equal("RayJob", pg.OwnerReferences[0].Kind) - a.Equal(rayJob.Name, pg.OwnerReferences[0].Name) + // 1 head + 2 workers (desired, not min replicas) + a.Equal(int32(3), pg.Spec.MinMember) + // 256m * 3 (requests, not limits) + a.Equal("768m", pg.Spec.MinResources.Cpu().String()) + // 256m * 3 (requests, not limits) + a.Equal("768Mi", pg.Spec.MinResources.Memory().String()) + a.Equal("test-queue", pg.Spec.Queue) + a.Equal("test-priority", pg.Spec.PriorityClassName) + a.Len(pg.OwnerReferences, 1) + a.Equal("RayJob", pg.OwnerReferences[0].Kind) + }) - // Verify queue and priority class are set from RayJob labels - a.Equal("test-queue", pg.Spec.Queue) - a.Equal("high-priority", pg.Spec.PriorityClassName) + t.Run("K8sJobMode includes submitter pod resources", func(_ *testing.T) { + rayJob := createTestRayJob(1) + rayJob.Spec.SubmissionMode = rayv1.K8sJobMode + + err := scheduler.handleRayJob(ctx, &rayJob) + require.NoError(t, err) + + var pg volcanoschedulingv1beta1.PodGroup + err = fakeCli.Get(ctx, client.ObjectKey{Namespace: rayJob.Namespace, Name: getAppPodGroupName(&rayJob)}, &pg) + require.NoError(t, err) + + // 1 head + 2 workers (desired, not min replicas) + a.Equal(int32(3), pg.Spec.MinMember) + // 768m + 500m = 1268m + a.Equal("1268m", pg.Spec.MinResources.Cpu().String()) + // 768Mi + 200Mi = 968Mi + a.Equal("968Mi", pg.Spec.MinResources.Memory().String()) + a.Equal("test-queue", pg.Spec.Queue) + a.Equal("test-priority", pg.Spec.PriorityClassName) + a.Len(pg.OwnerReferences, 1) + a.Equal("RayJob", pg.OwnerReferences[0].Kind) + }) +} - // 1 head + 2 workers (desired, not min replicas) - a.Equal(int32(3), pg.Spec.MinMember) +func TestCreatePodGroupForRayJob_NumOfHosts2(t *testing.T) { + a := assert.New(t) + ctx := context.Background() + + scheme := runtime.NewScheme() + a.NoError(rayv1.AddToScheme(scheme)) + a.NoError(volcanoschedulingv1beta1.AddToScheme(scheme)) + fakeCli := fake.NewClientBuilder().WithScheme(scheme).Build() + scheduler := &VolcanoBatchScheduler{cli: fakeCli} + + t.Run("No submitter pod resources", func(_ *testing.T) { + rayJob := createTestRayJob(2) + rayJob.Spec.SubmissionMode = rayv1.HTTPMode + + err := scheduler.handleRayJob(ctx, &rayJob) + require.NoError(t, err) + + var pg volcanoschedulingv1beta1.PodGroup + err = fakeCli.Get(ctx, client.ObjectKey{Namespace: rayJob.Namespace, Name: getAppPodGroupName(&rayJob)}, &pg) + require.NoError(t, err) + + // 2 workers (desired, not min replicas) * 2 (num of hosts) + 1 head + // 2 * 2 + 1 = 5 + a.Equal(int32(5), pg.Spec.MinMember) + // 256m * (2 (requests, not limits) * 2 (num of hosts) + 1 head) + // 256m * 5 = 1280m + a.Equal("1280m", pg.Spec.MinResources.Cpu().String()) + // 256Mi * (2 (requests, not limits) * 2 (num of hosts) + 1 head) + // 256Mi * 5 = 1280Mi + a.Equal("1280Mi", pg.Spec.MinResources.Memory().String()) + a.Equal("test-queue", pg.Spec.Queue) + a.Equal("test-priority", pg.Spec.PriorityClassName) + a.Len(pg.OwnerReferences, 1) + a.Equal("RayJob", pg.OwnerReferences[0].Kind) + }) + + t.Run("K8sJobMode includes submitter pod resources", func(_ *testing.T) { + rayJob := createTestRayJob(2) + rayJob.Spec.SubmissionMode = rayv1.K8sJobMode + + err := scheduler.handleRayJob(ctx, &rayJob) + require.NoError(t, err) + + var pg volcanoschedulingv1beta1.PodGroup + err = fakeCli.Get(ctx, client.ObjectKey{Namespace: rayJob.Namespace, Name: getAppPodGroupName(&rayJob)}, &pg) + require.NoError(t, err) + + // 2 workers (desired, not min replicas) * 2 (num of hosts) + 1 head + // 2 * 2 + 1 = 5 + a.Equal(int32(5), pg.Spec.MinMember) + // 1280m + 500m = 1780m + a.Equal("1780m", pg.Spec.MinResources.Cpu().String()) + // 1280Mi + 200Mi = 1480Mi + a.Equal("1480Mi", pg.Spec.MinResources.Memory().String()) + a.Equal("test-queue", pg.Spec.Queue) + a.Equal("test-priority", pg.Spec.PriorityClassName) + a.Len(pg.OwnerReferences, 1) + a.Equal("RayJob", pg.OwnerReferences[0].Kind) + }) } func TestAddMetadataToSubmitterPod(t *testing.T) { @@ -253,7 +345,7 @@ func TestAddMetadataToSubmitterPod(t *testing.T) { // Check labels a.Equal("test-queue", submitterTemplate.Labels[QueueNameLabelKey]) - a.Equal("high-priority", submitterTemplate.Labels[utils.RayPriorityClassName]) + a.Equal("test-priority", submitterTemplate.Labels[utils.RayPriorityClassName]) // Check scheduler name a.Equal(pluginName, submitterTemplate.Spec.SchedulerName) diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index fe01233e6ca..e923437350b 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -937,7 +937,7 @@ func (r *RayJobReconciler) getOrCreateRayClusterInstance(ctx context.Context, ra if err != nil { return nil, err } - if r.options.BatchSchedulerManager != nil && rayJobInstance.Spec.SubmissionMode == rayv1.K8sJobMode { + if r.options.BatchSchedulerManager != nil { if scheduler, err := r.options.BatchSchedulerManager.GetScheduler(); err == nil { // Group name is only used for individual pods to specify their task group ("headgroup", "worker-group-1", etc.). // RayCluster contains multiple groups, so we pass an empty string. diff --git a/ray-operator/controllers/ray/utils/util.go b/ray-operator/controllers/ray/utils/util.go index cf6b9066323..3bb63f79189 100644 --- a/ray-operator/controllers/ray/utils/util.go +++ b/ray-operator/controllers/ray/utils/util.go @@ -440,7 +440,7 @@ func CalculateDesiredResources(cluster *rayv1.RayCluster) corev1.ResourceList { desiredResourcesList = append(desiredResourcesList, podResource) } } - return sumResourceList(desiredResourcesList) + return SumResourceList(desiredResourcesList) } func CalculateMinResources(cluster *rayv1.RayCluster) corev1.ResourceList { @@ -454,7 +454,7 @@ func CalculateMinResources(cluster *rayv1.RayCluster) corev1.ResourceList { minResourcesList = append(minResourcesList, podResource) } } - return sumResourceList(minResourcesList) + return SumResourceList(minResourcesList) } // calculateReplicaResource adjusts the resource quantities in a given ResourceList @@ -503,7 +503,7 @@ func ConvertResourceListToMapString(resourceList corev1.ResourceList) map[string return result } -func sumResourceList(list []corev1.ResourceList) corev1.ResourceList { +func SumResourceList(list []corev1.ResourceList) corev1.ResourceList { totalResource := corev1.ResourceList{} for _, l := range list { for name, quantity := range l { From a33a3b7a4b2f5027214ab77ced0bcb2c77e57c1f Mon Sep 17 00:00:00 2001 From: win5923 Date: Fri, 26 Sep 2025 14:19:37 +0000 Subject: [PATCH 06/12] Remove empty ResourceList Signed-off-by: win5923 --- .../controllers/ray/batchscheduler/volcano/volcano_scheduler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go index 638cd2c837a..9f5c0cb0f76 100644 --- a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go @@ -71,7 +71,7 @@ func (v *VolcanoBatchScheduler) handleRayJob(ctx context.Context, rayJob *rayv1. return fmt.Errorf("gang scheduling does not support RayJob %s/%s referencing an existing RayCluster", rayJob.Namespace, rayJob.Name) } - totalResourceList := []corev1.ResourceList{{}} + var totalResourceList []corev1.ResourceList minMember, totalResource := v.calculatePodGroupParams(ctx, rayJob.Spec.RayClusterSpec) totalResourceList = append(totalResourceList, totalResource) From 42479c252f2f209b0f6bc851aebd0d5e072402d5 Mon Sep 17 00:00:00 2001 From: win5923 Date: Sat, 27 Sep 2025 15:08:47 +0000 Subject: [PATCH 07/12] Add K8sJobMode check to prevent YuniKorn from adding submitter pod annotations Signed-off-by: win5923 --- .../ray/batchscheduler/volcano/volcano_scheduler.go | 4 ++-- ray-operator/controllers/ray/rayjob_controller.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go index 9f5c0cb0f76..dd8741f1a41 100644 --- a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go @@ -80,8 +80,8 @@ func (v *VolcanoBatchScheduler) handleRayJob(ctx context.Context, rayJob *rayv1. // submitter's resource requests into MinResources so capacity is reserved. if rayJob.Spec.SubmissionMode == rayv1.K8sJobMode { submitterTemplate := common.GetSubmitterTemplate(&rayJob.Spec, rayJob.Spec.RayClusterSpec) - submitResource := utils.CalculatePodResource(submitterTemplate.Spec) - totalResourceList = append(totalResourceList, submitResource) + submitterResource := utils.CalculatePodResource(submitterTemplate.Spec) + totalResourceList = append(totalResourceList, submitterResource) } return v.syncPodGroup(ctx, rayJob, minMember, utils.SumResourceList(totalResourceList)) diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index e923437350b..fe01233e6ca 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -937,7 +937,7 @@ func (r *RayJobReconciler) getOrCreateRayClusterInstance(ctx context.Context, ra if err != nil { return nil, err } - if r.options.BatchSchedulerManager != nil { + if r.options.BatchSchedulerManager != nil && rayJobInstance.Spec.SubmissionMode == rayv1.K8sJobMode { if scheduler, err := r.options.BatchSchedulerManager.GetScheduler(); err == nil { // Group name is only used for individual pods to specify their task group ("headgroup", "worker-group-1", etc.). // RayCluster contains multiple groups, so we pass an empty string. From 597e57d6ddf2e2f74ee9bdd8105c62286a376e1c Mon Sep 17 00:00:00 2001 From: win5923 Date: Sun, 5 Oct 2025 01:41:40 +0800 Subject: [PATCH 08/12] Apply Troy's comments Signed-off-by: win5923 --- .../ray/batchscheduler/volcano/volcano_scheduler.go | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go index dd8741f1a41..f0ab9067fd0 100644 --- a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go @@ -114,13 +114,7 @@ func populateAnnotations(parent metav1.Object, child metav1.Object, groupName st annotations = make(map[string]string) } annotations[volcanoschedulingv1beta1.KubeGroupNameAnnotationKey] = getAppPodGroupName(parent) - - switch child.(type) { - case *corev1.Pod: - annotations[volcanobatchv1alpha1.TaskSpecKey] = groupName - case *corev1.PodTemplateSpec: - annotations[volcanobatchv1alpha1.TaskSpecKey] = utils.RayNodeSubmitterGroupLabelValue - } + annotations[volcanobatchv1alpha1.TaskSpecKey] = groupName child.SetAnnotations(annotations) } @@ -144,7 +138,7 @@ func (v *VolcanoBatchScheduler) syncPodGroup(ctx context.Context, owner metav1.O podGroup := volcanoschedulingv1beta1.PodGroup{} if err := v.cli.Get(ctx, types.NamespacedName{Namespace: owner.GetNamespace(), Name: podGroupName}, &podGroup); err != nil { if !errors.IsNotFound(err) { - logger.Error(err, "failed to get PodGroup", "name", podGroupName) + logger.Error(err, "failed to get PodGroup", "podGroupName", podGroupName, "ownerKind", utils.GetCRDType(owner.GetLabels()[utils.RayOriginatedFromCRDLabelKey]), "ownerName", owner.GetName(), "ownerNamespace", owner.GetNamespace()) return err } From a64c011d2ec25853cfc4d4c24af17a28170e03b9 Mon Sep 17 00:00:00 2001 From: win5923 Date: Mon, 6 Oct 2025 03:03:47 +0800 Subject: [PATCH 09/12] Log more information Signed-off-by: win5923 --- .../ray/batchscheduler/volcano/volcano_scheduler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go index f0ab9067fd0..33921820b32 100644 --- a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go @@ -149,7 +149,7 @@ func (v *VolcanoBatchScheduler) syncPodGroup(ctx context.Context, owner metav1.O return nil } - logger.Error(err, "failed to create PodGroup", "name", podGroupName) + logger.Error(err, "failed to create PodGroup", "name", podGroupName, "ownerKind", utils.GetCRDType(owner.GetLabels()[utils.RayOriginatedFromCRDLabelKey]), "ownerName", owner.GetName(), "ownerNamespace", owner.GetNamespace()) return err } } else { @@ -157,7 +157,7 @@ func (v *VolcanoBatchScheduler) syncPodGroup(ctx context.Context, owner metav1.O podGroup.Spec.MinMember = size podGroup.Spec.MinResources = &totalResource if err := v.cli.Update(ctx, &podGroup); err != nil { - logger.Error(err, "failed to update PodGroup", "name", podGroupName) + logger.Error(err, "failed to update PodGroup", "name", podGroupName, "ownerKind", utils.GetCRDType(owner.GetLabels()[utils.RayOriginatedFromCRDLabelKey]), "ownerName", owner.GetName(), "ownerNamespace", owner.GetNamespace()) return err } } From b8cb224b742a977489137ade669932bb1ee9736c Mon Sep 17 00:00:00 2001 From: win5923 Date: Wed, 8 Oct 2025 15:51:15 +0000 Subject: [PATCH 10/12] Add sidecarmode Signed-off-by: win5923 --- .../volcano/volcano_scheduler.go | 2 +- .../volcano/volcano_scheduler_test.go | 47 +++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go index 33921820b32..1ced4864f78 100644 --- a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go @@ -78,7 +78,7 @@ func (v *VolcanoBatchScheduler) handleRayJob(ctx context.Context, rayJob *rayv1. // MinMember intentionally excludes the submitter pod to avoid a startup deadlock // (submitter waits for cluster; gang would wait for submitter). We still add the // submitter's resource requests into MinResources so capacity is reserved. - if rayJob.Spec.SubmissionMode == rayv1.K8sJobMode { + if rayJob.Spec.SubmissionMode == rayv1.K8sJobMode || rayJob.Spec.SubmissionMode == rayv1.SidecarMode { submitterTemplate := common.GetSubmitterTemplate(&rayJob.Spec, rayJob.Spec.RayClusterSpec) submitterResource := utils.CalculatePodResource(submitterTemplate.Spec) totalResourceList = append(totalResourceList, submitterResource) diff --git a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler_test.go b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler_test.go index 934df7b7a30..d137c01f76c 100644 --- a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler_test.go +++ b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler_test.go @@ -261,6 +261,29 @@ func TestCreatePodGroupForRayJob(t *testing.T) { a.Len(pg.OwnerReferences, 1) a.Equal("RayJob", pg.OwnerReferences[0].Kind) }) + + t.Run("SidecarMode includes submitter pod resources", func(_ *testing.T) { + rayJob := createTestRayJob(1) + rayJob.Spec.SubmissionMode = rayv1.SidecarMode + + err := scheduler.handleRayJob(ctx, &rayJob) + require.NoError(t, err) + + var pg volcanoschedulingv1beta1.PodGroup + err = fakeCli.Get(ctx, client.ObjectKey{Namespace: rayJob.Namespace, Name: getAppPodGroupName(&rayJob)}, &pg) + require.NoError(t, err) + + // 1 head + 2 workers (desired, not min replicas) + a.Equal(int32(3), pg.Spec.MinMember) + // 768m + 500m = 1268m + a.Equal("1268m", pg.Spec.MinResources.Cpu().String()) + // 768Mi + 200Mi = 968Mi + a.Equal("968Mi", pg.Spec.MinResources.Memory().String()) + a.Equal("test-queue", pg.Spec.Queue) + a.Equal("test-priority", pg.Spec.PriorityClassName) + a.Len(pg.OwnerReferences, 1) + a.Equal("RayJob", pg.OwnerReferences[0].Kind) + }) } func TestCreatePodGroupForRayJob_NumOfHosts2(t *testing.T) { @@ -322,6 +345,30 @@ func TestCreatePodGroupForRayJob_NumOfHosts2(t *testing.T) { a.Len(pg.OwnerReferences, 1) a.Equal("RayJob", pg.OwnerReferences[0].Kind) }) + + t.Run("SidecarMode includes submitter pod resources", func(_ *testing.T) { + rayJob := createTestRayJob(2) + rayJob.Spec.SubmissionMode = rayv1.SidecarMode + + err := scheduler.handleRayJob(ctx, &rayJob) + require.NoError(t, err) + + var pg volcanoschedulingv1beta1.PodGroup + err = fakeCli.Get(ctx, client.ObjectKey{Namespace: rayJob.Namespace, Name: getAppPodGroupName(&rayJob)}, &pg) + require.NoError(t, err) + + // 2 workers (desired, not min replicas) * 2 (num of hosts) + 1 head + // 2 * 2 + 1 = 5 + a.Equal(int32(5), pg.Spec.MinMember) + // 1280m + 500m = 1780m + a.Equal("1780m", pg.Spec.MinResources.Cpu().String()) + // 1280Mi + 200Mi = 1480Mi + a.Equal("1480Mi", pg.Spec.MinResources.Memory().String()) + a.Equal("test-queue", pg.Spec.Queue) + a.Equal("test-priority", pg.Spec.PriorityClassName) + a.Len(pg.OwnerReferences, 1) + a.Equal("RayJob", pg.OwnerReferences[0].Kind) + }) } func TestAddMetadataToSubmitterPod(t *testing.T) { From 84d39f94575738e210862b13827654b8ba01d69c Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Thu, 9 Oct 2025 17:39:20 +0800 Subject: [PATCH 11/12] fix Signed-off-by: Future-Outlier --- .../volcano/volcano_scheduler.go | 20 ++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go index 1ced4864f78..4fdb487c83a 100644 --- a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go @@ -78,13 +78,23 @@ func (v *VolcanoBatchScheduler) handleRayJob(ctx context.Context, rayJob *rayv1. // MinMember intentionally excludes the submitter pod to avoid a startup deadlock // (submitter waits for cluster; gang would wait for submitter). We still add the // submitter's resource requests into MinResources so capacity is reserved. - if rayJob.Spec.SubmissionMode == rayv1.K8sJobMode || rayJob.Spec.SubmissionMode == rayv1.SidecarMode { + submitterResource := getSubmitterResource(rayJob) + totalResourceList = append(totalResourceList, submitterResource) + return v.syncPodGroup(ctx, rayJob, minMember, utils.SumResourceList(totalResourceList)) +} + +func getSubmitterResource(rayJob *rayv1.RayJob) corev1.ResourceList { + if rayJob.Spec.SubmissionMode == rayv1.K8sJobMode { submitterTemplate := common.GetSubmitterTemplate(&rayJob.Spec, rayJob.Spec.RayClusterSpec) - submitterResource := utils.CalculatePodResource(submitterTemplate.Spec) - totalResourceList = append(totalResourceList, submitterResource) + return utils.CalculatePodResource(submitterTemplate.Spec) + } else if rayJob.Spec.SubmissionMode == rayv1.SidecarMode { + submitterContainer := common.GetDefaultSubmitterContainer(rayJob.Spec.RayClusterSpec) + return corev1.ResourceList{ + corev1.ResourceCPU: submitterContainer.Resources.Requests[corev1.ResourceCPU], + corev1.ResourceMemory: submitterContainer.Resources.Requests[corev1.ResourceMemory], + } } - - return v.syncPodGroup(ctx, rayJob, minMember, utils.SumResourceList(totalResourceList)) + return corev1.ResourceList{} } func getAppPodGroupName(object metav1.Object) string { From cf6b48bb361aae7decfe7309e66a28495031a3f5 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Thu, 9 Oct 2025 20:36:59 +0800 Subject: [PATCH 12/12] Update Rueian's advice Signed-off-by: Future-Outlier Co-authored-by: Rueian --- .../ray/batchscheduler/volcano/volcano_scheduler.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go index 4fdb487c83a..44ccb55d5d8 100644 --- a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go @@ -89,10 +89,13 @@ func getSubmitterResource(rayJob *rayv1.RayJob) corev1.ResourceList { return utils.CalculatePodResource(submitterTemplate.Spec) } else if rayJob.Spec.SubmissionMode == rayv1.SidecarMode { submitterContainer := common.GetDefaultSubmitterContainer(rayJob.Spec.RayClusterSpec) - return corev1.ResourceList{ - corev1.ResourceCPU: submitterContainer.Resources.Requests[corev1.ResourceCPU], - corev1.ResourceMemory: submitterContainer.Resources.Requests[corev1.ResourceMemory], + containerResource := submitterContainer.Resources.Requests + for name, quantity := range submitterContainer.Resources.Limits { + if _, ok := containerResource[name]; !ok { + containerResource[name] = quantity + } } + return containerResource } return corev1.ResourceList{} }