Skip to content
Merged
Show file tree
Hide file tree
Changes from 56 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
6c0d860
modify batch scheduler interface to support CRD other than RayCluster
troychiu Aug 7, 2025
ebf0cb3
update kai-scheduler to fit batchscheduler interface
owenowenisme Aug 17, 2025
7d7823b
rename GetSchedulerForCluster to GetScheduler
owenowenisme Aug 18, 2025
9e12f53
update
owenowenisme Aug 18, 2025
f5b7df1
update
owenowenisme Aug 19, 2025
7d8c4e8
rename funcs
owenowenisme Aug 19, 2025
a9b6b95
update
owenowenisme Aug 19, 2025
0e9cbd8
add unit test
owenowenisme Aug 20, 2025
4eb38e5
update
owenowenisme Aug 20, 2025
27ccfa2
update sample yaml
owenowenisme Aug 21, 2025
7bf11d1
Update ray-operator/controllers/ray/rayjob_controller.go
owenowenisme Aug 21, 2025
1abda9c
Update ray-operator/controllers/ray/rayjob_controller.go
owenowenisme Aug 21, 2025
da6efc2
remove redundant update rayjob
owenowenisme Aug 21, 2025
51a2d1e
Update ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_…
owenowenisme Aug 23, 2025
0def3ae
update yaml
owenowenisme Aug 23, 2025
6c77276
Update ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_…
owenowenisme Aug 27, 2025
ce99f4e
Update ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_…
owenowenisme Aug 27, 2025
4a89804
Update ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_…
owenowenisme Aug 27, 2025
469e4a8
add logger back
owenowenisme Aug 27, 2025
07e1f0d
rename AddMetadataToChildResourceFromRayJob to AddMetadataToChildReso…
owenowenisme Aug 27, 2025
0bea9a1
add more unit test
owenowenisme Aug 27, 2025
e7e7f20
update unit test
owenowenisme Aug 27, 2025
d44b8be
Apply suggestion from @Future-Outlier
owenowenisme Sep 1, 2025
3393d9f
Apply suggestion from @Future-Outlier
owenowenisme Sep 1, 2025
e28caa5
Apply suggestion from @Future-Outlier
owenowenisme Sep 1, 2025
22f2679
Apply suggestion from @Future-Outlier
owenowenisme Sep 1, 2025
dcc6f1c
Apply suggestion from @Future-Outlier
owenowenisme Sep 1, 2025
668d6a2
Apply suggestion from @Future-Outlier
owenowenisme Sep 1, 2025
f1e0c6b
remove comment in sample yaml
owenowenisme Sep 1, 2025
7a2d474
update job controller to only use rayjob controller to set taskgroup …
owenowenisme Sep 1, 2025
c2f92a1
Merge remote-tracking branch 'upstream/master' into rayjob-yunikorn-i…
owenowenisme Sep 1, 2025
ce9a513
rename AddMetadataToChildResourceFromRayCluster to AddMetadataToPodFr…
owenowenisme Sep 4, 2025
0551ac1
split AddMetadataToClildResources into 2 functions for RayCluster and…
owenowenisme Sep 4, 2025
8c6d088
Update ray-operator/controllers/ray/batchscheduler/interface/interfac…
owenowenisme Sep 4, 2025
effd7bb
Update ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_…
owenowenisme Sep 4, 2025
12a1b0c
update log
owenowenisme Sep 4, 2025
c7e88b4
Merge remote-tracking branch 'upstream/master' into rayjob-yunikorn-i…
owenowenisme Sep 6, 2025
ee9a61b
Merge remote-tracking branch 'upstream/master' into rayjob-yunikorn-i…
owenowenisme Sep 7, 2025
2b39ede
simplify interface
owenowenisme Sep 7, 2025
1a414f0
add comment
owenowenisme Sep 7, 2025
25eebf2
rename func
owenowenisme Sep 7, 2025
110ba38
rename func
owenowenisme Sep 7, 2025
83954d9
rename AddMetadataToChildResources to AddMetadataToChildResource
owenowenisme Sep 8, 2025
adf84e6
Update ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_…
owenowenisme Sep 9, 2025
afa537f
remove redundant annotation existing check
owenowenisme Sep 9, 2025
68e6cbe
add check if label exist before populate to child
owenowenisme Sep 9, 2025
8f9aef5
rename RayClusterGangSchedulingEnabled
owenowenisme Sep 10, 2025
74df79d
simplify propagateTaskGroupsAnnotation logic
owenowenisme Sep 10, 2025
0d03a21
update
owenowenisme Sep 10, 2025
2fe091e
update comment
owenowenisme Sep 10, 2025
cd09b3e
simplfy annotation propagation logic
owenowenisme Sep 10, 2025
2c6e649
Update ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_…
owenowenisme Sep 10, 2025
2646d20
Merge remote-tracking branch 'upstream/master' into rayjob-yunikorn-i…
owenowenisme Sep 15, 2025
ff38824
use-metav1-obj-instead-of-client-object
owenowenisme Sep 15, 2025
69a0531
resolve comment of pattern consistency
owenowenisme Sep 16, 2025
9a6c707
remove repetitive code in create task group
owenowenisme Sep 16, 2025
7cc7c2d
Merge remote-tracking branch 'upstream/master' into rayjob-yunikorn-i…
owenowenisme Sep 17, 2025
0646149
fix unit test
owenowenisme Sep 17, 2025
3066c41
add comment back
owenowenisme Sep 17, 2025
906ab83
Refactor RayJob submitter template handling
owenowenisme Sep 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions ray-operator/config/samples/ray-job.yunikorn-scheduler.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
apiVersion: ray.io/v1
kind: RayJob
metadata:
name: rayjob-yunikorn-0
labels:
Comment on lines +1 to +5
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you write some comments specifically for yunikorn-scheduler?
For example, how user can configure it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought those will be written in docs since RayCluster + batchScheduler did not come with those comments.

If you think this is needed, I can open a new PR after this one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated: I will add comments to this yaml and ray-cluster.yunikorn-scheduler.yaml in another PR.

ray.io/gang-scheduling-enabled: "true"
yunikorn.apache.org/app-id: rayjob-yunikorn-0
yunikorn.apache.org/queue: root.test
spec:
entrypoint: python /home/ray/samples/sample_code.py
runtimeEnvYAML: |
pip:
- requests==2.26.0
- pendulum==2.1.2
env_vars:
counter_name: "test_counter"
rayClusterSpec:
rayVersion: '2.46.0'
headGroupSpec:
rayStartParams: {}
template:
spec:
containers:
- name: ray-head
image: rayproject/ray:2.46.0
ports:
- containerPort: 6379
name: gcs-server
- containerPort: 8265
name: dashboard
- containerPort: 10001
name: client
resources:
limits:
cpu: "1"
memory: "2Gi"
requests:
cpu: "1"
memory: "2Gi"
volumeMounts:
- mountPath: /home/ray/samples
name: code-sample
volumes:
- name: code-sample
configMap:
name: ray-job-code-sample
items:
- key: sample_code.py
path: sample_code.py
workerGroupSpecs:
- replicas: 1
minReplicas: 1
maxReplicas: 5
groupName: small-group
rayStartParams: {}
template:
spec:
containers:
- name: ray-worker
image: rayproject/ray:2.46.0
resources:
limits:
cpu: "1"
memory: "2Gi"
requests:
cpu: "1"
memory: "2Gi"

---
apiVersion: v1
kind: ConfigMap
metadata:
name: ray-job-code-sample
data:
sample_code.py: |
import ray
import os
import requests

ray.init()

@ray.remote
class Counter:
def __init__(self):
# Used to verify runtimeEnv
self.name = os.getenv("counter_name")
assert self.name == "test_counter"
self.counter = 0

def inc(self):
self.counter += 1

def get_counter(self):
return "{} got {}".format(self.name, self.counter)

counter = Counter.remote()

for _ in range(5):
ray.get(counter.inc.remote())
print(ray.get(counter.get_counter.remote()))

# Verify that the correct runtime env was used for the job.
assert requests.__version__ == "2.26.0"
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/builder"
Expand All @@ -18,13 +19,18 @@ type BatchScheduler interface {
// https://kubernetes.io/docs/tasks/extend-kubernetes/configure-multiple-schedulers/
Name() string

// DoBatchSchedulingOnSubmission handles submitting the RayCluster to the batch scheduler on creation / update
// DoBatchSchedulingOnSubmission handles submitting the RayCluster/RayJob to the batch scheduler on creation / update
// For most batch schedulers, this results in the creation of a PodGroup.
DoBatchSchedulingOnSubmission(ctx context.Context, app *rayv1.RayCluster) error
DoBatchSchedulingOnSubmission(ctx context.Context, object metav1.Object) error

// AddMetadataToPod enriches Pod specs with metadata necessary to tie them to the scheduler.
// AddMetadataToPod enriches the pod with metadata necessary to tie it to the scheduler.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add comment saying that we are removing this method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, added!

// For example, setting labels for queues / priority, and setting schedulerName.
AddMetadataToPod(ctx context.Context, app *rayv1.RayCluster, groupName string, pod *corev1.Pod)
// This function will be removed once Rayjob Volcano scheduler integration is completed.
AddMetadataToPod(ctx context.Context, rayCluster *rayv1.RayCluster, groupName string, pod *corev1.Pod)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will remove AddMetadataToPod method in Volcano PR.


// AddMetadataToChildResource enriches the child resource (batchv1.Job, rayv1.RayCluster) with metadata necessary to tie it to the scheduler.
// For example, setting labels for queues / priority, and setting schedulerName.
AddMetadataToChildResource(ctx context.Context, parent metav1.Object, child metav1.Object, groupName string)
}

// BatchSchedulerFactory handles initial setup of the scheduler plugin by registering the
Expand Down Expand Up @@ -53,13 +59,16 @@ func (d *DefaultBatchScheduler) Name() string {
return GetDefaultPluginName()
}

func (d *DefaultBatchScheduler) DoBatchSchedulingOnSubmission(_ context.Context, _ *rayv1.RayCluster) error {
func (d *DefaultBatchScheduler) DoBatchSchedulingOnSubmission(_ context.Context, _ metav1.Object) error {
return nil
}

func (d *DefaultBatchScheduler) AddMetadataToPod(_ context.Context, _ *rayv1.RayCluster, _ string, _ *corev1.Pod) {
}

func (d *DefaultBatchScheduler) AddMetadataToChildResource(_ context.Context, _ metav1.Object, _ metav1.Object, _ string) {
}

func (df *DefaultBatchSchedulerFactory) New(_ context.Context, _ *rest.Config, _ client.Client) (BatchScheduler, error) {
return &DefaultBatchScheduler{}, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"context"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -33,7 +34,7 @@ func GetPluginName() string { return "kai-scheduler" }

func (k *KaiScheduler) Name() string { return GetPluginName() }

func (k *KaiScheduler) DoBatchSchedulingOnSubmission(_ context.Context, _ *rayv1.RayCluster) error {
func (k *KaiScheduler) DoBatchSchedulingOnSubmission(_ context.Context, _ metav1.Object) error {
return nil
}

Expand All @@ -53,6 +54,9 @@ func (k *KaiScheduler) AddMetadataToPod(ctx context.Context, app *rayv1.RayClust
pod.Labels[QueueLabelName] = queue
}

func (k *KaiScheduler) AddMetadataToChildResource(_ context.Context, _ metav1.Object, _ metav1.Object, _ string) {
}

func (kf *KaiSchedulerFactory) New(_ context.Context, _ *rest.Config, _ client.Client) (schedulerinterface.BatchScheduler, error) {
return &KaiScheduler{}, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,11 @@ func createPodGroup(ctx context.Context, app *rayv1.RayCluster) *v1alpha1.PodGro
return podGroup
}

func (k *KubeScheduler) DoBatchSchedulingOnSubmission(ctx context.Context, app *rayv1.RayCluster) error {
func (k *KubeScheduler) DoBatchSchedulingOnSubmission(ctx context.Context, object metav1.Object) error {
app, ok := object.(*rayv1.RayCluster)
if !ok {
return fmt.Errorf("currently only RayCluster is supported, got %T", object)
}
if !k.isGangSchedulingEnabled(app) {
return nil
}
Expand All @@ -89,18 +93,21 @@ func (k *KubeScheduler) DoBatchSchedulingOnSubmission(ctx context.Context, app *
return nil
}

// AddMetadataToPod adds essential labels and annotations to the Ray pods
// AddMetadataToPod adds essential labels and annotations to the Ray pod
// the scheduler needs these labels and annotations in order to do the scheduling properly
func (k *KubeScheduler) AddMetadataToPod(_ context.Context, app *rayv1.RayCluster, _ string, pod *corev1.Pod) {
func (k *KubeScheduler) AddMetadataToPod(_ context.Context, rayCluster *rayv1.RayCluster, _ string, pod *corev1.Pod) {
// when gang scheduling is enabled, extra labels need to be added to all pods
if k.isGangSchedulingEnabled(app) {
pod.Labels[kubeSchedulerPodGroupLabelKey] = app.Name
if k.isGangSchedulingEnabled(rayCluster) {
pod.Labels[kubeSchedulerPodGroupLabelKey] = rayCluster.Name
}
pod.Spec.SchedulerName = k.Name()
}

func (k *KubeScheduler) AddMetadataToChildResource(_ context.Context, _ metav1.Object, _ metav1.Object, _ string) {
}

func (k *KubeScheduler) isGangSchedulingEnabled(app *rayv1.RayCluster) bool {
_, exist := app.Labels[utils.RayClusterGangSchedulingEnabled]
_, exist := app.Labels[utils.RayGangSchedulingEnabled]
return exist
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func getSchedulerFactory(rayConfigs configapi.Configuration) (schedulerinterface
return factory, nil
}

func (batch *SchedulerManager) GetSchedulerForCluster() (schedulerinterface.BatchScheduler, error) {
func (batch *SchedulerManager) GetScheduler() (schedulerinterface.BatchScheduler, error) {
return batch.scheduler, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ func (v *VolcanoBatchScheduler) Name() string {
return GetPluginName()
}

func (v *VolcanoBatchScheduler) DoBatchSchedulingOnSubmission(ctx context.Context, app *rayv1.RayCluster) error {
func (v *VolcanoBatchScheduler) DoBatchSchedulingOnSubmission(ctx context.Context, object metav1.Object) error {
app, ok := object.(*rayv1.RayCluster)
if !ok {
return fmt.Errorf("currently only RayCluster is supported, got %T", object)
}
var minMember int32
var totalResource corev1.ResourceList
if !utils.IsAutoscalingEnabled(&app.Spec) {
Expand Down Expand Up @@ -139,6 +143,9 @@ func (v *VolcanoBatchScheduler) AddMetadataToPod(_ context.Context, app *rayv1.R
pod.Spec.SchedulerName = v.Name()
}

func (v *VolcanoBatchScheduler) AddMetadataToChildResource(_ context.Context, _ metav1.Object, _ metav1.Object, _ string) {
}

func (vf *VolcanoBatchSchedulerFactory) New(_ context.Context, _ *rest.Config, cli client.Client) (schedulerinterface.BatchScheduler, error) {
if err := volcanov1beta1.AddToScheme(cli.Scheme()); err != nil {
return nil, fmt.Errorf("failed to add volcano to scheme with error %w", err)
Expand Down
Loading
Loading