Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
6c0d860
modify batch scheduler interface to support CRD other than RayCluster
troychiu Aug 7, 2025
ebf0cb3
update kai-scheduler to fit batchscheduler interface
owenowenisme Aug 17, 2025
7d7823b
rename GetSchedulerForCluster to GetScheduler
owenowenisme Aug 18, 2025
9e12f53
update
owenowenisme Aug 18, 2025
f5b7df1
update
owenowenisme Aug 19, 2025
7d8c4e8
rename funcs
owenowenisme Aug 19, 2025
a9b6b95
update
owenowenisme Aug 19, 2025
0e9cbd8
add unit test
owenowenisme Aug 20, 2025
4eb38e5
update
owenowenisme Aug 20, 2025
27ccfa2
update sample yaml
owenowenisme Aug 21, 2025
7bf11d1
Update ray-operator/controllers/ray/rayjob_controller.go
owenowenisme Aug 21, 2025
1abda9c
Update ray-operator/controllers/ray/rayjob_controller.go
owenowenisme Aug 21, 2025
da6efc2
remove redundant update rayjob
owenowenisme Aug 21, 2025
51a2d1e
Update ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_…
owenowenisme Aug 23, 2025
0def3ae
update yaml
owenowenisme Aug 23, 2025
6c77276
Update ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_…
owenowenisme Aug 27, 2025
ce99f4e
Update ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_…
owenowenisme Aug 27, 2025
4a89804
Update ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_…
owenowenisme Aug 27, 2025
469e4a8
add logger back
owenowenisme Aug 27, 2025
07e1f0d
rename AddMetadataToChildResourceFromRayJob to AddMetadataToChildReso…
owenowenisme Aug 27, 2025
0bea9a1
add more unit test
owenowenisme Aug 27, 2025
e7e7f20
update unit test
owenowenisme Aug 27, 2025
d44b8be
Apply suggestion from @Future-Outlier
owenowenisme Sep 1, 2025
3393d9f
Apply suggestion from @Future-Outlier
owenowenisme Sep 1, 2025
e28caa5
Apply suggestion from @Future-Outlier
owenowenisme Sep 1, 2025
22f2679
Apply suggestion from @Future-Outlier
owenowenisme Sep 1, 2025
dcc6f1c
Apply suggestion from @Future-Outlier
owenowenisme Sep 1, 2025
668d6a2
Apply suggestion from @Future-Outlier
owenowenisme Sep 1, 2025
f1e0c6b
remove comment in sample yaml
owenowenisme Sep 1, 2025
7a2d474
update job controller to only use rayjob controller to set taskgroup …
owenowenisme Sep 1, 2025
c2f92a1
Merge remote-tracking branch 'upstream/master' into rayjob-yunikorn-i…
owenowenisme Sep 1, 2025
ce9a513
rename AddMetadataToChildResourceFromRayCluster to AddMetadataToPodFr…
owenowenisme Sep 4, 2025
0551ac1
split AddMetadataToClildResources into 2 functions for RayCluster and…
owenowenisme Sep 4, 2025
8c6d088
Update ray-operator/controllers/ray/batchscheduler/interface/interfac…
owenowenisme Sep 4, 2025
effd7bb
Update ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_…
owenowenisme Sep 4, 2025
12a1b0c
update log
owenowenisme Sep 4, 2025
c7e88b4
Merge remote-tracking branch 'upstream/master' into rayjob-yunikorn-i…
owenowenisme Sep 6, 2025
ee9a61b
Merge remote-tracking branch 'upstream/master' into rayjob-yunikorn-i…
owenowenisme Sep 7, 2025
2b39ede
simplify interface
owenowenisme Sep 7, 2025
1a414f0
add comment
owenowenisme Sep 7, 2025
25eebf2
rename func
owenowenisme Sep 7, 2025
110ba38
rename func
owenowenisme Sep 7, 2025
83954d9
rename AddMetadataToChildResources to AddMetadataToChildResource
owenowenisme Sep 8, 2025
adf84e6
Update ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_…
owenowenisme Sep 9, 2025
afa537f
remove redundant annotation existing check
owenowenisme Sep 9, 2025
68e6cbe
add check if label exist before populate to child
owenowenisme Sep 9, 2025
8f9aef5
rename RayClusterGangSchedulingEnabled
owenowenisme Sep 10, 2025
74df79d
simplify propagateTaskGroupsAnnotation logic
owenowenisme Sep 10, 2025
0d03a21
update
owenowenisme Sep 10, 2025
2fe091e
update comment
owenowenisme Sep 10, 2025
cd09b3e
simplfy annotation propagation logic
owenowenisme Sep 10, 2025
2c6e649
Update ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_…
owenowenisme Sep 10, 2025
2646d20
Merge remote-tracking branch 'upstream/master' into rayjob-yunikorn-i…
owenowenisme Sep 15, 2025
ff38824
use-metav1-obj-instead-of-client-object
owenowenisme Sep 15, 2025
69a0531
resolve comment of pattern consistency
owenowenisme Sep 16, 2025
9a6c707
remove repetitive code in create task group
owenowenisme Sep 16, 2025
7cc7c2d
Merge remote-tracking branch 'upstream/master' into rayjob-yunikorn-i…
owenowenisme Sep 17, 2025
0646149
fix unit test
owenowenisme Sep 17, 2025
3066c41
add comment back
owenowenisme Sep 17, 2025
906ab83
Refactor RayJob submitter template handling
owenowenisme Sep 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 157 additions & 0 deletions ray-operator/config/samples/ray-job.yunikorn-scheduler.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
apiVersion: ray.io/v1
kind: RayJob
metadata:
name: rayjob-yunikorn-scheduler
labels:
ray.io/gang-scheduling-enabled: "true"
yunikorn.apache.org/app-id: test-yunikorn-job-0
Copy link
Collaborator

@win5923 win5923 Aug 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to set yunikorn.apache.org/app-id and RayJob.Name to the same name, so that when renaming, the app-id is updated together. This makes it easier for users to understand when a newly created RayJob is stuck in the Accepted state but not running yet.

image

Ref: https://docs.ray.io/en/latest/cluster/kubernetes/k8s-ecosystem/yunikorn.html#step-4-use-apache-yunikorn-for-gang-scheduling

image

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, just updated.

yunikorn.apache.org/queue: root.test
spec:
# submissionMode specifies how RayJob submits the Ray job to the RayCluster.
# The default value is "K8sJobMode", meaning RayJob will submit the Ray job via a submitter Kubernetes Job.
# The alternative value is "HTTPMode", indicating that KubeRay will submit the Ray job by sending an HTTP request to the RayCluster.
# submissionMode: "K8sJobMode"
entrypoint: python /home/ray/samples/sample_code.py
# shutdownAfterJobFinishes specifies whether the RayCluster should be deleted after the RayJob finishes. Default is false.
# shutdownAfterJobFinishes: false

# ttlSecondsAfterFinished specifies the number of seconds after which the RayCluster will be deleted after the RayJob finishes.
# ttlSecondsAfterFinished: 10

# activeDeadlineSeconds is the duration in seconds that the RayJob may be active before
# KubeRay actively tries to terminate the RayJob; value must be positive integer.
# activeDeadlineSeconds: 120

# RuntimeEnvYAML represents the runtime environment configuration provided as a multi-line YAML string.
# See https://docs.ray.io/en/latest/ray-core/handling-dependencies.html for details.
# (New in KubeRay version 1.0.)
runtimeEnvYAML: |
pip:
- requests==2.26.0
- pendulum==2.1.2
env_vars:
counter_name: "test_counter"

# Suspend specifies whether the RayJob controller should create a RayCluster instance.
# If a job is applied with the suspend field set to true, the RayCluster will not be created and we will wait for the transition to false.
# If the RayCluster is already created, it will be deleted. In the case of transition to false, a new RayCluster will be created.
# suspend: false

# rayClusterSpec specifies the RayCluster instance to be created by the RayJob controller.
rayClusterSpec:
rayVersion: '2.46.0' # should match the Ray version in the image of the containers
# Ray head pod template
headGroupSpec:
# The `rayStartParams` are used to configure the `ray start` command.
# See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay.
# See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`.
rayStartParams: {}
#pod template
template:
spec:
containers:
- name: ray-head
image: rayproject/ray:2.46.0
ports:
- containerPort: 6379
name: gcs-server
- containerPort: 8265 # Ray dashboard
name: dashboard
- containerPort: 10001
name: client
resources:
limits:
cpu: "1"
requests:
cpu: "200m"
volumeMounts:
- mountPath: /home/ray/samples
name: code-sample
volumes:
# You set volumes at the Pod level, then mount them into containers inside that Pod
- name: code-sample
configMap:
# Provide the name of the ConfigMap you want to mount.
name: ray-job-code-sample
# An array of keys from the ConfigMap to create as files
items:
- key: sample_code.py
path: sample_code.py
workerGroupSpecs:
# the pod replicas in this group typed worker
- replicas: 1
minReplicas: 1
maxReplicas: 5
# logical group name, for this called small-group, also can be functional
groupName: small-group
# The `rayStartParams` are used to configure the `ray start` command.
# See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay.
# See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`.
rayStartParams: {}
#pod template
template:
spec:
containers:
- name: ray-worker # must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name', or '123-abc'
image: rayproject/ray:2.46.0
resources:
limits:
cpu: "1"
requests:
cpu: "200m"

# SubmitterPodTemplate is the template for the pod that will run the `ray job submit` command against the RayCluster.
# If SubmitterPodTemplate is specified, the first container is assumed to be the submitter container.
# submitterPodTemplate:
# spec:
# restartPolicy: Never
# containers:
# - name: my-custom-rayjob-submitter-pod
# image: rayproject/ray:2.46.0
# # command: ["sh", "-c", "ray job submit --address=http://$RAY_DASHBOARD_ADDRESS --submission-id=$RAY_JOB_SUBMISSION_ID -- echo hello world"]
# resources:
# limits:
# cpu: "1"
# requests:
# cpu: "200m"



######################Ray code sample#################################
# this sample is from https://docs.ray.io/en/latest/cluster/job-submission.html#quick-start-example
# it is mounted into the container and executed to show the Ray job at work
---
apiVersion: v1
kind: ConfigMap
metadata:
name: ray-job-code-sample
data:
sample_code.py: |
import ray
import os
import requests

ray.init()

@ray.remote
class Counter:
def __init__(self):
# Used to verify runtimeEnv
self.name = os.getenv("counter_name")
assert self.name == "test_counter"
self.counter = 0

def inc(self):
self.counter += 1

def get_counter(self):
return "{} got {}".format(self.name, self.counter)

counter = Counter.remote()

for _ in range(5):
ray.get(counter.inc.remote())
print(ray.get(counter.get_counter.remote()))

# Verify that the correct runtime env was used for the job.
assert requests.__version__ == "2.26.0"
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@ type BatchScheduler interface {
// https://kubernetes.io/docs/tasks/extend-kubernetes/configure-multiple-schedulers/
Name() string

// DoBatchSchedulingOnSubmission handles submitting the RayCluster to the batch scheduler on creation / update
// DoBatchSchedulingOnSubmission handles submitting the RayCluster/RayJob/RayService to the batch scheduler on creation / update
// For most batch schedulers, this results in the creation of a PodGroup.
DoBatchSchedulingOnSubmission(ctx context.Context, app *rayv1.RayCluster) error
DoBatchSchedulingOnSubmission(ctx context.Context, object client.Object) error

// AddMetadataToPod enriches Pod specs with metadata necessary to tie them to the scheduler.
// AddMetadataToChildResourceFromRayCluster enriches child resource with metadata necessary to tie it to the scheduler.
// For example, setting labels for queues / priority, and setting schedulerName.
AddMetadataToPod(ctx context.Context, app *rayv1.RayCluster, groupName string, pod *corev1.Pod)
AddMetadataToChildResourceFromRayCluster(ctx context.Context, rayCluster *rayv1.RayCluster, groupName string, pod *corev1.Pod)

// AddMetadataToChildResourceFromRayJob enriches child resource with metadata necessary to tie it to the scheduler.
// For example, setting labels for queues / priority, and setting schedulerName.
AddMetadataToChildResourceFromRayJob(ctx context.Context, rayJob *rayv1.RayJob, rayCluster *rayv1.RayCluster, submitterTemplate *corev1.PodTemplateSpec)
}

// BatchSchedulerFactory handles initial setup of the scheduler plugin by registering the
Expand Down Expand Up @@ -53,11 +57,15 @@ func (d *DefaultBatchScheduler) Name() string {
return GetDefaultPluginName()
}

func (d *DefaultBatchScheduler) DoBatchSchedulingOnSubmission(_ context.Context, _ *rayv1.RayCluster) error {
func (d *DefaultBatchScheduler) DoBatchSchedulingOnSubmission(_ context.Context, _ client.Object) error {
return nil
}

func (d *DefaultBatchScheduler) AddMetadataToPod(_ context.Context, _ *rayv1.RayCluster, _ string, _ *corev1.Pod) {
func (d *DefaultBatchScheduler) AddMetadataToChildResourceFromRayCluster(_ context.Context, _ *rayv1.RayCluster, _ string, _ *corev1.Pod) {
}

// AddMetadataToChildResourceFromRayJob Add necessary metadata from RayJob to RayCluster and submitter pod template for BatchScheduler
func (d *DefaultBatchScheduler) AddMetadataToChildResourceFromRayJob(_ context.Context, _ *rayv1.RayJob, _ *rayv1.RayCluster, _ /*submitterTemplate*/ *corev1.PodTemplateSpec) {
}

func (df *DefaultBatchSchedulerFactory) New(_ context.Context, _ *rest.Config, _ client.Client) (BatchScheduler, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package kaischeduler

import (
"context"
"fmt"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -36,7 +37,13 @@ func GetPluginName() string { return "kai-scheduler" }

func (k *KaiScheduler) Name() string { return GetPluginName() }

func (k *KaiScheduler) DoBatchSchedulingOnSubmission(_ context.Context, _ *rayv1.RayCluster) error {
func (k *KaiScheduler) DoBatchSchedulingOnSubmission(_ context.Context, object client.Object) error {
_, ok := object.(*rayv1.RayCluster)
if !ok {
return fmt.Errorf("currently only RayCluster is supported, got %T", object)
}
// yunikorn doesn't require any resources to be created upfront
// this is a no-opt for this implementation
return nil
}

Expand All @@ -56,6 +63,12 @@ func (k *KaiScheduler) AddMetadataToPod(_ context.Context, app *rayv1.RayCluster
pod.Labels[QueueLabelName] = queue
}

func (k *KaiScheduler) AddMetadataToChildResourceFromRayCluster(_ context.Context, _ *rayv1.RayCluster, _ string, _ *corev1.Pod) {
}

func (k *KaiScheduler) AddMetadataToChildResourceFromRayJob(_ context.Context, _ *rayv1.RayJob, _ *rayv1.RayCluster, _ *corev1.PodTemplateSpec) {
}

func (kf *KaiSchedulerFactory) New(_ context.Context, _ *rest.Config, _ client.Client) (schedulerinterface.BatchScheduler, error) {
return &KaiScheduler{
log: logf.Log.WithName("kai-scheduler"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,11 @@ func createPodGroup(ctx context.Context, app *rayv1.RayCluster) *v1alpha1.PodGro
return podGroup
}

func (k *KubeScheduler) DoBatchSchedulingOnSubmission(ctx context.Context, app *rayv1.RayCluster) error {
func (k *KubeScheduler) DoBatchSchedulingOnSubmission(ctx context.Context, object client.Object) error {
app, ok := object.(*rayv1.RayCluster)
if !ok {
return fmt.Errorf("currently only RayCluster is supported, got %T", object)
}
if !k.isGangSchedulingEnabled(app) {
return nil
}
Expand All @@ -89,16 +93,19 @@ func (k *KubeScheduler) DoBatchSchedulingOnSubmission(ctx context.Context, app *
return nil
}

// AddMetadataToPod adds essential labels and annotations to the Ray pods
// AddMetadataToChildResourceFromRayCluster adds essential labels and annotations to the Ray pods
// the scheduler needs these labels and annotations in order to do the scheduling properly
func (k *KubeScheduler) AddMetadataToPod(_ context.Context, app *rayv1.RayCluster, _ string, pod *corev1.Pod) {
func (k *KubeScheduler) AddMetadataToChildResourceFromRayCluster(_ context.Context, rayCluster *rayv1.RayCluster, _ string, pod *corev1.Pod) {
// when gang scheduling is enabled, extra labels need to be added to all pods
if k.isGangSchedulingEnabled(app) {
pod.Labels[kubeSchedulerPodGroupLabelKey] = app.Name
if k.isGangSchedulingEnabled(rayCluster) {
pod.Labels[kubeSchedulerPodGroupLabelKey] = rayCluster.Name
}
pod.Spec.SchedulerName = k.Name()
}

func (k *KubeScheduler) AddMetadataToChildResourceFromRayJob(_ context.Context, _ *rayv1.RayJob, _ *rayv1.RayCluster, _ *corev1.PodTemplateSpec) {
}

func (k *KubeScheduler) isGangSchedulingEnabled(app *rayv1.RayCluster) bool {
_, exist := app.Labels[utils.RayClusterGangSchedulingEnabled]
return exist
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func TestAddMetadataToPod(t *testing.T) {
}

scheduler := &KubeScheduler{}
scheduler.AddMetadataToPod(context.TODO(), &cluster, "worker", pod)
scheduler.AddMetadataToChildResourceFromRayCluster(context.TODO(), &cluster, "worker", pod)

if tt.enableGang {
a.Equal(cluster.Name, pod.Labels[kubeSchedulerPodGroupLabelKey])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func getSchedulerFactory(rayConfigs configapi.Configuration) (schedulerinterface
return factory, nil
}

func (batch *SchedulerManager) GetSchedulerForCluster() (schedulerinterface.BatchScheduler, error) {
func (batch *SchedulerManager) GetScheduler() (schedulerinterface.BatchScheduler, error) {
return batch.scheduler, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ func (v *VolcanoBatchScheduler) Name() string {
return GetPluginName()
}

func (v *VolcanoBatchScheduler) DoBatchSchedulingOnSubmission(ctx context.Context, app *rayv1.RayCluster) error {
func (v *VolcanoBatchScheduler) DoBatchSchedulingOnSubmission(ctx context.Context, object client.Object) error {
app, ok := object.(*rayv1.RayCluster)
if !ok {
return fmt.Errorf("currently only RayCluster is supported, got %T", object)
}
var minMember int32
var totalResource corev1.ResourceList
if !utils.IsAutoscalingEnabled(&app.Spec) {
Expand Down Expand Up @@ -129,18 +133,21 @@ func createPodGroup(
return podGroup
}

func (v *VolcanoBatchScheduler) AddMetadataToPod(_ context.Context, app *rayv1.RayCluster, groupName string, pod *corev1.Pod) {
pod.Annotations[v1beta1.KubeGroupNameAnnotationKey] = getAppPodGroupName(app)
func (v *VolcanoBatchScheduler) AddMetadataToChildResourceFromRayCluster(_ context.Context, rayCluster *rayv1.RayCluster, groupName string, pod *corev1.Pod) {
pod.Annotations[v1beta1.KubeGroupNameAnnotationKey] = getAppPodGroupName(rayCluster)
pod.Annotations[volcanov1alpha1.TaskSpecKey] = groupName
if queue, ok := app.ObjectMeta.Labels[QueueNameLabelKey]; ok {
if queue, ok := rayCluster.ObjectMeta.Labels[QueueNameLabelKey]; ok {
pod.Labels[QueueNameLabelKey] = queue
}
if priorityClassName, ok := app.ObjectMeta.Labels[utils.RayPriorityClassName]; ok {
if priorityClassName, ok := rayCluster.ObjectMeta.Labels[utils.RayPriorityClassName]; ok {
pod.Spec.PriorityClassName = priorityClassName
}
pod.Spec.SchedulerName = v.Name()
}

func (v *VolcanoBatchScheduler) AddMetadataToChildResourceFromRayJob(_ context.Context, _ *rayv1.RayJob, _ *rayv1.RayCluster, _ *corev1.PodTemplateSpec) {
}

func (vf *VolcanoBatchSchedulerFactory) New(ctx context.Context, config *rest.Config, cli client.Client) (schedulerinterface.BatchScheduler, error) {
// client not start yet, so we need to create new client to check if podGroup CRD exists
extClient, err := apiextensionsclient.NewForConfig(config)
Expand Down
Loading
Loading