Skip to content

Commit c9fa013

Browse files
owenowenismetroychiuwin5923Future-Outlier
authored
[RayJob] Yunikorn Integration (#3948)
--------- Signed-off-by: Troy Chiu <[email protected]> Signed-off-by: You-Cheng Lin (Owen) <[email protected]> Signed-off-by: Owen Lin (You-Cheng Lin) <[email protected]> Co-authored-by: Troy Chiu <[email protected]> Co-authored-by: Jun-Hao Wan <[email protected]> Co-authored-by: Han-Ju Chen (Future-Outlier) <[email protected]>
1 parent 9c55fc4 commit c9fa013

File tree

17 files changed

+837
-171
lines changed

17 files changed

+837
-171
lines changed
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
apiVersion: ray.io/v1
2+
kind: RayJob
3+
metadata:
4+
name: rayjob-yunikorn-0
5+
labels:
6+
ray.io/gang-scheduling-enabled: "true"
7+
yunikorn.apache.org/app-id: rayjob-yunikorn-0
8+
yunikorn.apache.org/queue: root.test
9+
spec:
10+
entrypoint: python /home/ray/samples/sample_code.py
11+
runtimeEnvYAML: |
12+
pip:
13+
- requests==2.26.0
14+
- pendulum==2.1.2
15+
env_vars:
16+
counter_name: "test_counter"
17+
rayClusterSpec:
18+
rayVersion: '2.46.0'
19+
headGroupSpec:
20+
rayStartParams: {}
21+
template:
22+
spec:
23+
containers:
24+
- name: ray-head
25+
image: rayproject/ray:2.46.0
26+
ports:
27+
- containerPort: 6379
28+
name: gcs-server
29+
- containerPort: 8265
30+
name: dashboard
31+
- containerPort: 10001
32+
name: client
33+
resources:
34+
limits:
35+
cpu: "1"
36+
memory: "2Gi"
37+
requests:
38+
cpu: "1"
39+
memory: "2Gi"
40+
volumeMounts:
41+
- mountPath: /home/ray/samples
42+
name: code-sample
43+
volumes:
44+
- name: code-sample
45+
configMap:
46+
name: ray-job-code-sample
47+
items:
48+
- key: sample_code.py
49+
path: sample_code.py
50+
workerGroupSpecs:
51+
- replicas: 1
52+
minReplicas: 1
53+
maxReplicas: 5
54+
groupName: small-group
55+
rayStartParams: {}
56+
template:
57+
spec:
58+
containers:
59+
- name: ray-worker
60+
image: rayproject/ray:2.46.0
61+
resources:
62+
limits:
63+
cpu: "1"
64+
memory: "2Gi"
65+
requests:
66+
cpu: "1"
67+
memory: "2Gi"
68+
69+
---
70+
apiVersion: v1
71+
kind: ConfigMap
72+
metadata:
73+
name: ray-job-code-sample
74+
data:
75+
sample_code.py: |
76+
import ray
77+
import os
78+
import requests
79+
80+
ray.init()
81+
82+
@ray.remote
83+
class Counter:
84+
def __init__(self):
85+
# Used to verify runtimeEnv
86+
self.name = os.getenv("counter_name")
87+
assert self.name == "test_counter"
88+
self.counter = 0
89+
90+
def inc(self):
91+
self.counter += 1
92+
93+
def get_counter(self):
94+
return "{} got {}".format(self.name, self.counter)
95+
96+
counter = Counter.remote()
97+
98+
for _ in range(5):
99+
ray.get(counter.inc.remote())
100+
print(ray.get(counter.get_counter.remote()))
101+
102+
# Verify that the correct runtime env was used for the job.
103+
assert requests.__version__ == "2.26.0"

ray-operator/controllers/ray/batchscheduler/interface/interface.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55

66
corev1 "k8s.io/api/core/v1"
7+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
78
"k8s.io/apimachinery/pkg/runtime"
89
"k8s.io/client-go/rest"
910
"sigs.k8s.io/controller-runtime/pkg/builder"
@@ -18,13 +19,18 @@ type BatchScheduler interface {
1819
// https://kubernetes.io/docs/tasks/extend-kubernetes/configure-multiple-schedulers/
1920
Name() string
2021

21-
// DoBatchSchedulingOnSubmission handles submitting the RayCluster to the batch scheduler on creation / update
22+
// DoBatchSchedulingOnSubmission handles submitting the RayCluster/RayJob to the batch scheduler on creation / update
2223
// For most batch schedulers, this results in the creation of a PodGroup.
23-
DoBatchSchedulingOnSubmission(ctx context.Context, app *rayv1.RayCluster) error
24+
DoBatchSchedulingOnSubmission(ctx context.Context, object metav1.Object) error
2425

25-
// AddMetadataToPod enriches Pod specs with metadata necessary to tie them to the scheduler.
26+
// AddMetadataToPod enriches the pod with metadata necessary to tie it to the scheduler.
2627
// For example, setting labels for queues / priority, and setting schedulerName.
27-
AddMetadataToPod(ctx context.Context, app *rayv1.RayCluster, groupName string, pod *corev1.Pod)
28+
// This function will be removed once Rayjob Volcano scheduler integration is completed.
29+
AddMetadataToPod(ctx context.Context, rayCluster *rayv1.RayCluster, groupName string, pod *corev1.Pod)
30+
31+
// AddMetadataToChildResource enriches the child resource (batchv1.Job, rayv1.RayCluster) with metadata necessary to tie it to the scheduler.
32+
// For example, setting labels for queues / priority, and setting schedulerName.
33+
AddMetadataToChildResource(ctx context.Context, parent metav1.Object, child metav1.Object, groupName string)
2834
}
2935

3036
// BatchSchedulerFactory handles initial setup of the scheduler plugin by registering the
@@ -53,13 +59,16 @@ func (d *DefaultBatchScheduler) Name() string {
5359
return GetDefaultPluginName()
5460
}
5561

56-
func (d *DefaultBatchScheduler) DoBatchSchedulingOnSubmission(_ context.Context, _ *rayv1.RayCluster) error {
62+
func (d *DefaultBatchScheduler) DoBatchSchedulingOnSubmission(_ context.Context, _ metav1.Object) error {
5763
return nil
5864
}
5965

6066
func (d *DefaultBatchScheduler) AddMetadataToPod(_ context.Context, _ *rayv1.RayCluster, _ string, _ *corev1.Pod) {
6167
}
6268

69+
func (d *DefaultBatchScheduler) AddMetadataToChildResource(_ context.Context, _ metav1.Object, _ metav1.Object, _ string) {
70+
}
71+
6372
func (df *DefaultBatchSchedulerFactory) New(_ context.Context, _ *rest.Config, _ client.Client) (BatchScheduler, error) {
6473
return &DefaultBatchScheduler{}, nil
6574
}

ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"context"
1212

1313
corev1 "k8s.io/api/core/v1"
14+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1415
"k8s.io/apimachinery/pkg/runtime"
1516
"k8s.io/client-go/rest"
1617
ctrl "sigs.k8s.io/controller-runtime"
@@ -33,7 +34,7 @@ func GetPluginName() string { return "kai-scheduler" }
3334

3435
func (k *KaiScheduler) Name() string { return GetPluginName() }
3536

36-
func (k *KaiScheduler) DoBatchSchedulingOnSubmission(_ context.Context, _ *rayv1.RayCluster) error {
37+
func (k *KaiScheduler) DoBatchSchedulingOnSubmission(_ context.Context, _ metav1.Object) error {
3738
return nil
3839
}
3940

@@ -53,6 +54,9 @@ func (k *KaiScheduler) AddMetadataToPod(ctx context.Context, app *rayv1.RayClust
5354
pod.Labels[QueueLabelName] = queue
5455
}
5556

57+
func (k *KaiScheduler) AddMetadataToChildResource(_ context.Context, _ metav1.Object, _ metav1.Object, _ string) {
58+
}
59+
5660
func (kf *KaiSchedulerFactory) New(_ context.Context, _ *rest.Config, _ client.Client) (schedulerinterface.BatchScheduler, error) {
5761
return &KaiScheduler{}, nil
5862
}

ray-operator/controllers/ray/batchscheduler/scheduler-plugins/scheduler_plugins.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,11 @@ func createPodGroup(ctx context.Context, app *rayv1.RayCluster) *v1alpha1.PodGro
6969
return podGroup
7070
}
7171

72-
func (k *KubeScheduler) DoBatchSchedulingOnSubmission(ctx context.Context, app *rayv1.RayCluster) error {
72+
func (k *KubeScheduler) DoBatchSchedulingOnSubmission(ctx context.Context, object metav1.Object) error {
73+
app, ok := object.(*rayv1.RayCluster)
74+
if !ok {
75+
return fmt.Errorf("currently only RayCluster is supported, got %T", object)
76+
}
7377
if !k.isGangSchedulingEnabled(app) {
7478
return nil
7579
}
@@ -89,18 +93,21 @@ func (k *KubeScheduler) DoBatchSchedulingOnSubmission(ctx context.Context, app *
8993
return nil
9094
}
9195

92-
// AddMetadataToPod adds essential labels and annotations to the Ray pods
96+
// AddMetadataToPod adds essential labels and annotations to the Ray pod
9397
// the scheduler needs these labels and annotations in order to do the scheduling properly
94-
func (k *KubeScheduler) AddMetadataToPod(_ context.Context, app *rayv1.RayCluster, _ string, pod *corev1.Pod) {
98+
func (k *KubeScheduler) AddMetadataToPod(_ context.Context, rayCluster *rayv1.RayCluster, _ string, pod *corev1.Pod) {
9599
// when gang scheduling is enabled, extra labels need to be added to all pods
96-
if k.isGangSchedulingEnabled(app) {
97-
pod.Labels[kubeSchedulerPodGroupLabelKey] = app.Name
100+
if k.isGangSchedulingEnabled(rayCluster) {
101+
pod.Labels[kubeSchedulerPodGroupLabelKey] = rayCluster.Name
98102
}
99103
pod.Spec.SchedulerName = k.Name()
100104
}
101105

106+
func (k *KubeScheduler) AddMetadataToChildResource(_ context.Context, _ metav1.Object, _ metav1.Object, _ string) {
107+
}
108+
102109
func (k *KubeScheduler) isGangSchedulingEnabled(app *rayv1.RayCluster) bool {
103-
_, exist := app.Labels[utils.RayClusterGangSchedulingEnabled]
110+
_, exist := app.Labels[utils.RayGangSchedulingEnabled]
104111
return exist
105112
}
106113

ray-operator/controllers/ray/batchscheduler/schedulermanager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func getSchedulerFactory(rayConfigs configapi.Configuration) (schedulerinterface
8383
return factory, nil
8484
}
8585

86-
func (batch *SchedulerManager) GetSchedulerForCluster() (schedulerinterface.BatchScheduler, error) {
86+
func (batch *SchedulerManager) GetScheduler() (schedulerinterface.BatchScheduler, error) {
8787
return batch.scheduler, nil
8888
}
8989

ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,11 @@ func (v *VolcanoBatchScheduler) Name() string {
4242
return GetPluginName()
4343
}
4444

45-
func (v *VolcanoBatchScheduler) DoBatchSchedulingOnSubmission(ctx context.Context, app *rayv1.RayCluster) error {
45+
func (v *VolcanoBatchScheduler) DoBatchSchedulingOnSubmission(ctx context.Context, object metav1.Object) error {
46+
app, ok := object.(*rayv1.RayCluster)
47+
if !ok {
48+
return fmt.Errorf("currently only RayCluster is supported, got %T", object)
49+
}
4650
var minMember int32
4751
var totalResource corev1.ResourceList
4852
if !utils.IsAutoscalingEnabled(&app.Spec) {
@@ -139,6 +143,9 @@ func (v *VolcanoBatchScheduler) AddMetadataToPod(_ context.Context, app *rayv1.R
139143
pod.Spec.SchedulerName = v.Name()
140144
}
141145

146+
func (v *VolcanoBatchScheduler) AddMetadataToChildResource(_ context.Context, _ metav1.Object, _ metav1.Object, _ string) {
147+
}
148+
142149
func (vf *VolcanoBatchSchedulerFactory) New(_ context.Context, _ *rest.Config, cli client.Client) (schedulerinterface.BatchScheduler, error) {
143150
if err := volcanov1beta1.AddToScheme(cli.Scheme()); err != nil {
144151
return nil, fmt.Errorf("failed to add volcano to scheme with error %w", err)

0 commit comments

Comments
 (0)