Skip to content

Commit 4eb38e5

Browse files
committed
update
Signed-off-by: You-Cheng Lin (Owen) <[email protected]>
1 parent 0e9cbd8 commit 4eb38e5

File tree

4 files changed

+18
-33
lines changed

4 files changed

+18
-33
lines changed

ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler.go

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ package kaischeduler
99

1010
import (
1111
"context"
12-
"fmt"
1312

1413
"github.com/go-logr/logr"
1514
corev1 "k8s.io/api/core/v1"
@@ -37,13 +36,7 @@ func GetPluginName() string { return "kai-scheduler" }
3736

3837
func (k *KaiScheduler) Name() string { return GetPluginName() }
3938

40-
func (k *KaiScheduler) DoBatchSchedulingOnSubmission(_ context.Context, object client.Object) error {
41-
_, ok := object.(*rayv1.RayCluster)
42-
if !ok {
43-
return fmt.Errorf("currently only RayCluster is supported, got %T", object)
44-
}
45-
// yunikorn doesn't require any resources to be created upfront
46-
// this is a no-opt for this implementation
39+
func (k *KaiScheduler) DoBatchSchedulingOnSubmission(_ context.Context, _ client.Object) error {
4740
return nil
4841
}
4942

ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package yunikorn
22

33
import (
44
"context"
5-
"fmt"
65

76
"github.com/go-logr/logr"
87
corev1 "k8s.io/api/core/v1"
@@ -41,13 +40,7 @@ func (y *YuniKornScheduler) Name() string {
4140
return GetPluginName()
4241
}
4342

44-
func (y *YuniKornScheduler) DoBatchSchedulingOnSubmission(_ context.Context, object client.Object) error {
45-
switch obj := object.(type) {
46-
case *rayv1.RayCluster, *rayv1.RayJob:
47-
// Supported types
48-
default:
49-
return fmt.Errorf("currently only RayCluster and RayJob are supported, got %T", obj)
50-
}
43+
func (y *YuniKornScheduler) DoBatchSchedulingOnSubmission(_ context.Context, _ client.Object) error {
5144
// yunikorn doesn't require any resources to be created upfront
5245
// this is a no-opt for this implementation
5346
return nil
@@ -59,9 +52,9 @@ func (y *YuniKornScheduler) DoBatchSchedulingOnSubmission(_ context.Context, obj
5952
// Currently we use this function to translate labels "yunikorn.apache.org/app-id" and "yunikorn.apache.org/queue"
6053
// to legacy labels "applicationId" and "queue", this is for the better compatibilities to support older yunikorn
6154
// versions.
62-
func (y *YuniKornScheduler) populatePodLabelsFromRayCluster(_ context.Context, app *rayv1.RayCluster, pod *corev1.Pod, sourceKey string, targetKey string) {
55+
func (y *YuniKornScheduler) populatePodLabelsFromRayCluster(_ context.Context, rayCluster *rayv1.RayCluster, pod *corev1.Pod, sourceKey string, targetKey string) {
6356
// check labels
64-
if value, exist := app.Labels[sourceKey]; exist {
57+
if value, exist := rayCluster.Labels[sourceKey]; exist {
6558
y.logger.Info("Updating pod label based on RayCluster labels",
6659
"sourceKey", sourceKey, "targetKey", targetKey, "value", value)
6760
pod.Labels[targetKey] = value
@@ -144,14 +137,14 @@ func (y *YuniKornScheduler) isGangSchedulingEnabled(obj client.Object) bool {
144137
}
145138
}
146139

147-
func (y *YuniKornScheduler) populateTaskGroupsAnnotationToPod(_ context.Context, app *rayv1.RayCluster, pod *corev1.Pod) {
140+
func (y *YuniKornScheduler) populateTaskGroupsAnnotationToPod(_ context.Context, rayCluster *rayv1.RayCluster, pod *corev1.Pod) {
148141
var taskGroupsAnnotationValue string
149142
var err error
150-
if app.Annotations[YuniKornTaskGroupsAnnotationName] != "" {
151-
taskGroupsAnnotationValue = app.Annotations[YuniKornTaskGroupsAnnotationName]
143+
if rayCluster.Annotations[YuniKornTaskGroupsAnnotationName] != "" {
144+
taskGroupsAnnotationValue = rayCluster.Annotations[YuniKornTaskGroupsAnnotationName]
152145
y.logger.Info("using existing task groups annotation from RayCluster", "value", taskGroupsAnnotationValue)
153146
} else {
154-
taskGroups := newTaskGroupsFromApp(app)
147+
taskGroups := newTaskGroupsFromRayCluster(rayCluster)
155148
taskGroupsAnnotationValue, err = taskGroups.marshal()
156149
if err != nil {
157150
y.logger.Error(err, "failed to add gang scheduling related annotations to pod, "+

ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler_test.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -391,8 +391,7 @@ func createRayJobWithLabels(name string, namespace string, rayClusterSpec *rayv1
391391
return rayJob
392392
}
393393

394-
func addHeadPodSpec(app *rayv1.RayCluster, resource corev1.ResourceList) {
395-
// app.Spec.HeadGroupSpec.Template.Spec.Containers
394+
func addHeadPodSpec(rayCluster *rayv1.RayCluster, resource corev1.ResourceList) {
396395
headContainers := []corev1.Container{
397396
{
398397
Name: "head-pod",
@@ -404,10 +403,10 @@ func addHeadPodSpec(app *rayv1.RayCluster, resource corev1.ResourceList) {
404403
},
405404
}
406405

407-
app.Spec.HeadGroupSpec.Template.Spec.Containers = headContainers
406+
rayCluster.Spec.HeadGroupSpec.Template.Spec.Containers = headContainers
408407
}
409408

410-
func addWorkerPodSpec(app *rayv1.RayCluster, workerGroupName string,
409+
func addWorkerPodSpec(rayCluster *rayv1.RayCluster, workerGroupName string,
411410
replicas int32, minReplicas int32, maxReplicas int32, resources corev1.ResourceList,
412411
) {
413412
workerContainers := []corev1.Container{
@@ -421,7 +420,7 @@ func addWorkerPodSpec(app *rayv1.RayCluster, workerGroupName string,
421420
},
422421
}
423422

424-
app.Spec.WorkerGroupSpecs = append(app.Spec.WorkerGroupSpecs, rayv1.WorkerGroupSpec{
423+
rayCluster.Spec.WorkerGroupSpecs = append(rayCluster.Spec.WorkerGroupSpecs, rayv1.WorkerGroupSpec{
425424
GroupName: workerGroupName,
426425
Replicas: &replicas,
427426
MinReplicas: &minReplicas,

ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_task_groups.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,11 @@ func newTaskGroups() *TaskGroups {
3434
}
3535
}
3636

37-
func newTaskGroupsFromApp(app *v1.RayCluster) *TaskGroups {
37+
func newTaskGroupsFromRayCluster(rayCluster *v1.RayCluster) *TaskGroups {
3838
taskGroups := newTaskGroups()
3939

4040
// head group
41-
headGroupSpec := app.Spec.HeadGroupSpec
41+
headGroupSpec := rayCluster.Spec.HeadGroupSpec
4242
headPodMinResource := utils.CalculatePodResource(headGroupSpec.Template.Spec)
4343
taskGroups.addTaskGroup(
4444
TaskGroup{
@@ -51,7 +51,7 @@ func newTaskGroupsFromApp(app *v1.RayCluster) *TaskGroups {
5151
})
5252

5353
// worker groups
54-
for _, workerGroupSpec := range app.Spec.WorkerGroupSpecs {
54+
for _, workerGroupSpec := range rayCluster.Spec.WorkerGroupSpecs {
5555
workerMinResource := utils.CalculatePodResource(workerGroupSpec.Template.Spec)
5656
minWorkers := workerGroupSpec.MinReplicas
5757
taskGroups.addTaskGroup(
@@ -68,11 +68,11 @@ func newTaskGroupsFromApp(app *v1.RayCluster) *TaskGroups {
6868
return taskGroups
6969
}
7070

71-
func newTaskGroupsFromRayJob(app *v1.RayJob, submitterTemplate *corev1.PodTemplateSpec) (*TaskGroups, error) {
71+
func newTaskGroupsFromRayJob(rayJob *v1.RayJob, submitterTemplate *corev1.PodTemplateSpec) (*TaskGroups, error) {
7272
taskGroups := newTaskGroups()
7373

7474
// head group
75-
headGroupSpec := app.Spec.RayClusterSpec.HeadGroupSpec
75+
headGroupSpec := rayJob.Spec.RayClusterSpec.HeadGroupSpec
7676
headPodMinResource := utils.CalculatePodResource(headGroupSpec.Template.Spec)
7777
taskGroups.addTaskGroup(
7878
TaskGroup{
@@ -85,7 +85,7 @@ func newTaskGroupsFromRayJob(app *v1.RayJob, submitterTemplate *corev1.PodTempla
8585
})
8686

8787
// worker groups
88-
for _, workerGroupSpec := range app.Spec.RayClusterSpec.WorkerGroupSpecs {
88+
for _, workerGroupSpec := range rayJob.Spec.RayClusterSpec.WorkerGroupSpecs {
8989
workerMinResource := utils.CalculatePodResource(workerGroupSpec.Template.Spec)
9090
minWorkers := workerGroupSpec.MinReplicas
9191
taskGroups.addTaskGroup(

0 commit comments

Comments
 (0)