Skip to content
111 changes: 111 additions & 0 deletions ray-operator/config/samples/ray-job.volcano-scheduler-queue.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
apiVersion: scheduling.volcano.sh/v1beta1
kind: Queue
metadata:
name: kuberay-test-queue
spec:
weight: 1
capability:
cpu: 4
memory: 6Gi
---
apiVersion: ray.io/v1
kind: RayJob
metadata:
name: rayjob-sample-0
labels:
ray.io/scheduler-name: volcano
volcano.sh/queue-name: kuberay-test-queue
Comment on lines +15 to +17
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @win5923
can we add ray.io/gang-scheduling-enabled: "true" in the example and test them?

Copy link
Collaborator Author

@win5923 win5923 Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, adding ray.io/gang-scheduling-enabled: "true" does not have any effect. This only works with YuniKorn or the Scheduler plugin.

func (y *YuniKornScheduler) isGangSchedulingEnabled(obj metav1.Object) bool {
_, exist := obj.GetLabels()[utils.RayGangSchedulingEnabled]
return exist
}

func (k *KubeScheduler) isGangSchedulingEnabled(app *rayv1.RayCluster) bool {
_, exist := app.Labels[utils.RayGangSchedulingEnabled]
return exist
}

And I think this is a breaking change if we add this check. We should also update the doc to mention that starting from version 1.5.0, users need to add ray.io/gang-scheduling-enabled: "true" to enable gang scheduling for Volcano. https://docs.ray.io/en/latest/cluster/kubernetes/k8s-ecosystem/volcano.html

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found that volcano's default scheduler configmap is gang scheduling enabled!
so in the future, if the user want to disable it, we might need to tell them to edit the configmap or figure out some way to control it by adding more information in our CR.

thank you!!

apiVersion: v1
data:
  volcano-scheduler.conf: |
    actions: "enqueue, allocate, backfill"
    tiers:
    - plugins:
      - name: priority
      - name: gang
        enablePreemptable: false
      - name: conformance
    - plugins:
      - name: overcommit
      - name: drf
        enablePreemptable: false
      - name: predicates
      - name: proportion
      - name: nodeorder
      - name: binpack

spec:
entrypoint: python /home/ray/samples/sample_code.py
runtimeEnvYAML: |
pip:
- requests==2.26.0
- pendulum==2.1.2
env_vars:
counter_name: "test_counter"
rayClusterSpec:
rayVersion: '2.46.0'
headGroupSpec:
rayStartParams: {}
template:
spec:
containers:
- name: ray-head
image: rayproject/ray:2.46.0
ports:
- containerPort: 6379
name: gcs-server
- containerPort: 8265
name: dashboard
- containerPort: 10001
name: client
resources:
limits:
cpu: "1"
memory: "2Gi"
requests:
cpu: "1"
memory: "2Gi"
volumeMounts:
- mountPath: /home/ray/samples
name: code-sample
volumes:
- name: code-sample
configMap:
name: ray-job-code-sample
items:
- key: sample_code.py
path: sample_code.py
workerGroupSpecs:
- replicas: 2
minReplicas: 2
maxReplicas: 2
groupName: small-group
rayStartParams: {}
template:
spec:
containers:
- name: ray-worker
image: rayproject/ray:2.46.0
resources:
limits:
cpu: "1"
memory: "1Gi"
requests:
cpu: "1"
memory: "1Gi"
---
apiVersion: v1
kind: ConfigMap
metadata:
name: ray-job-code-sample
data:
sample_code.py: |
import ray
import os
import requests
ray.init()
@ray.remote
class Counter:
def __init__(self):
# Used to verify runtimeEnv
self.name = os.getenv("counter_name")
assert self.name == "test_counter"
self.counter = 0
def inc(self):
self.counter += 1
def get_counter(self):
return "{} got {}".format(self.name, self.counter)
counter = Counter.remote()
for _ in range(5):
ray.get(counter.inc.remote())
print(ray.get(counter.get_counter.remote()))
# Verify that the correct runtime env was used for the job.
assert requests.__version__ == "2.26.0"
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
volcanov1alpha1 "volcano.sh/apis/pkg/apis/batch/v1alpha1"
volcanov1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
volcanobatchv1alpha1 "volcano.sh/apis/pkg/apis/batch/v1alpha1"
volcanoschedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
schedulerinterface "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/interface"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
)

const (
PodGroupName = "podgroups.scheduling.volcano.sh"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable is unused and can be removed i think.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just remove it

pluginName = "volcano"
QueueNameLabelKey = "volcano.sh/queue-name"
)

Expand All @@ -34,106 +36,189 @@ type VolcanoBatchScheduler struct {

type VolcanoBatchSchedulerFactory struct{}

func GetPluginName() string {
return "volcano"
}
func GetPluginName() string { return pluginName }

func (v *VolcanoBatchScheduler) Name() string {
return GetPluginName()
}

func (v *VolcanoBatchScheduler) DoBatchSchedulingOnSubmission(ctx context.Context, object metav1.Object) error {
app, ok := object.(*rayv1.RayCluster)
if !ok {
return fmt.Errorf("currently only RayCluster is supported, got %T", object)
}
var minMember int32
var totalResource corev1.ResourceList
if !utils.IsAutoscalingEnabled(&app.Spec) {
minMember = utils.CalculateDesiredReplicas(ctx, app) + 1
totalResource = utils.CalculateDesiredResources(app)
} else {
minMember = utils.CalculateMinReplicas(app) + 1
totalResource = utils.CalculateMinResources(app)
switch obj := object.(type) {
case *rayv1.RayCluster:
return v.handleRayCluster(ctx, obj)
case *rayv1.RayJob:
return v.handleRayJob(ctx, obj)
default:
return fmt.Errorf("unsupported object type %T, only RayCluster and RayJob are supported", object)
}
}

// handleRayCluster calculates the PodGroup MinMember and MinResources for a RayCluster
func (v *VolcanoBatchScheduler) handleRayCluster(ctx context.Context, raycluster *rayv1.RayCluster) error {
// Check if this RayCluster is created by a RayJob, if so, skip PodGroup creation
if crdType, ok := raycluster.Labels[utils.RayOriginatedFromCRDLabelKey]; ok && crdType == utils.RayOriginatedFromCRDLabelValue(utils.RayJobCRD) {
return nil
}

return v.syncPodGroup(ctx, app, minMember, totalResource)
minMember, totalResource := v.calculatePodGroupParams(ctx, &raycluster.Spec)

return v.syncPodGroup(ctx, raycluster, minMember, totalResource)
}

func getAppPodGroupName(app *rayv1.RayCluster) string {
return fmt.Sprintf("ray-%s-pg", app.Name)
// handleRayJob calculates the PodGroup MinMember and MinResources for a RayJob
func (v *VolcanoBatchScheduler) handleRayJob(ctx context.Context, rayJob *rayv1.RayJob) error {
if rayJob.Spec.RayClusterSpec == nil {
return fmt.Errorf("gang scheduling does not support RayJob %s/%s referencing an existing RayCluster", rayJob.Namespace, rayJob.Name)
}

var totalResourceList []corev1.ResourceList
minMember, totalResource := v.calculatePodGroupParams(ctx, rayJob.Spec.RayClusterSpec)
totalResourceList = append(totalResourceList, totalResource)

// MinMember intentionally excludes the submitter pod to avoid a startup deadlock
// (submitter waits for cluster; gang would wait for submitter). We still add the
// submitter's resource requests into MinResources so capacity is reserved.
Comment on lines +78 to +80
Copy link

Copilot AI Oct 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment explains the design decision well, but could be clearer about what 'gang would wait for submitter' means. Consider expanding to explain that the gang scheduler would wait for all pods including the submitter to be schedulable before scheduling any, creating a circular dependency.

Suggested change
// MinMember intentionally excludes the submitter pod to avoid a startup deadlock
// (submitter waits for cluster; gang would wait for submitter). We still add the
// submitter's resource requests into MinResources so capacity is reserved.
// MinMember intentionally excludes the submitter pod to avoid a startup deadlock.
// If the submitter pod were included in MinMember, the gang scheduler would wait for
// all pods—including the submitter—to be schedulable before scheduling any of them.
// This creates a circular dependency: the submitter pod waits for the cluster to be ready,
// but the cluster cannot be scheduled until the submitter is also schedulable. To avoid this,
// we exclude the submitter from MinMember, but still add its resource requests into MinResources
// so that capacity is reserved for it.

Copilot uses AI. Check for mistakes.
if rayJob.Spec.SubmissionMode == rayv1.K8sJobMode || rayJob.Spec.SubmissionMode == rayv1.SidecarMode {
submitterTemplate := common.GetSubmitterTemplate(&rayJob.Spec, rayJob.Spec.RayClusterSpec)
submitterResource := utils.CalculatePodResource(submitterTemplate.Spec)
totalResourceList = append(totalResourceList, submitterResource)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the result for this code is correct, but the behavior is not.
for k8s mode, we should get submitter's information from submitterTemplate
for sidecar mode, we should get submitter's information from GetDefaultSubmitterContainer, since we use this function currently.

update:
I've discussed offline with @win5923
I am writing a commit to fix this!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Younikorn also uses GetSubmitterTemplate for sidecar mode. Would you like to update the Younikorn part as well (newTaskGroupsFromRayJobSpec)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I can do it now

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sidecar mode (only 2 items in task-groups) (the head pod resource = head container + submitter container)

image image

k8s mode (3 items in task-groups (head pod, worker pod, and submmiter pod))

image

Copy link
Member

@Future-Outlier Future-Outlier Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @rueian
I have tested the Yunikorn integration in both Kubernetes job mode and sidecar mode and can confirm that no code changes are required. The existing logic correctly handles both scenarios.

Kubernetes Job Mode

The function newTaskGroupsFromRayJobSpec is ultimately called by AddMetadataToChildResource. Within the RayJob controller, AddMetadataToChildResource is only invoked when the RayJob is configured for Kubernetes job mode, as seen in these two locations:

1st place:

func (r *RayJobReconciler) createK8sJobIfNeed(ctx context.Context, rayJobInstance *rayv1.RayJob, rayClusterInstance *rayv1.RayCluster) error {
logger := ctrl.LoggerFrom(ctx)
job := &batchv1.Job{}
namespacedName := common.RayJobK8sJobNamespacedName(rayJobInstance)
if err := r.Client.Get(ctx, namespacedName, job); err != nil {
if errors.IsNotFound(err) {
submitterTemplate, err := getSubmitterTemplate(rayJobInstance, rayClusterInstance)
if err != nil {
return err
}
if r.options.BatchSchedulerManager != nil {
if scheduler, err := r.options.BatchSchedulerManager.GetScheduler(); err == nil {
scheduler.AddMetadataToChildResource(ctx, rayJobInstance, &submitterTemplate, utils.RayNodeSubmitterGroupLabelValue)
} else {
return err
}
}
return r.createNewK8sJob(ctx, rayJobInstance, submitterTemplate)
}
return err
}
logger.Info("The submitter Kubernetes Job for RayJob already exists", "Kubernetes Job", job.Name)
return nil
}

2nd place:
if r.options.BatchSchedulerManager != nil && rayJobInstance.Spec.SubmissionMode == rayv1.K8sJobMode {
if scheduler, err := r.options.BatchSchedulerManager.GetScheduler(); err == nil {
// Group name is only used for individual pods to specify their task group ("headgroup", "worker-group-1", etc.).
// RayCluster contains multiple groups, so we pass an empty string.
scheduler.AddMetadataToChildResource(ctx, rayJobInstance, rayClusterInstance, "")
} else {
return nil, err
}

That's why it behaves correctly.

Because of this, the Yunikorn-specific logic is correctly applied only when the RayJob creates a Kubernetes Job, and it behaves as expected.

Sidecar Mode

In sidecar mode, the submitter container is added to the Ray head pod, which is part of the RayCluster specification. When the RayCluster controller reconciles the RayCluster custom resource, it calculates the task groups for the head and worker pods. At that point, the head pod correctly contains both the Ray head container and the submitter sidecar container, ensuring their resources are accounted for in the task group calculation, as handled by the logic here:

func (r *RayJobReconciler) constructRayClusterForRayJob(rayJobInstance *rayv1.RayJob, rayClusterName string) (*rayv1.RayCluster, error) {
labels := make(map[string]string, len(rayJobInstance.Labels))
for key, value := range rayJobInstance.Labels {
labels[key] = value
}
labels[utils.RayOriginatedFromCRNameLabelKey] = rayJobInstance.Name
labels[utils.RayOriginatedFromCRDLabelKey] = utils.RayOriginatedFromCRDLabelValue(utils.RayJobCRD)
rayCluster := &rayv1.RayCluster{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
Annotations: rayJobInstance.Annotations,
Name: rayClusterName,
Namespace: rayJobInstance.Namespace,
},
Spec: *rayJobInstance.Spec.RayClusterSpec.DeepCopy(),
}
// Set the ownership in order to do the garbage collection by k8s.
if err := ctrl.SetControllerReference(rayJobInstance, rayCluster, r.Scheme); err != nil {
return nil, err
}
// Inject a submitter container into the head Pod in SidecarMode.
if rayJobInstance.Spec.SubmissionMode == rayv1.SidecarMode {
sidecar, err := getSubmitterContainer(rayJobInstance, rayCluster)
if err != nil {
return nil, err
}
rayCluster.Spec.HeadGroupSpec.Template.Spec.Containers = append(
rayCluster.Spec.HeadGroupSpec.Template.Spec.Containers, sidecar)
// In K8sJobMode, the submitter Job relies on the K8s Job backoffLimit API to restart if it fails.
// This mainly handles WebSocket connection failures caused by transient network issues.
// In SidecarMode, however, the submitter container shares the same network namespace as the Ray dashboard,
// so restarts are no longer needed.
rayCluster.Spec.HeadGroupSpec.Template.Spec.RestartPolicy = corev1.RestartPolicyNever
}
return rayCluster, nil
}


return v.syncPodGroup(ctx, rayJob, minMember, utils.SumResourceList(totalResourceList))
}

func (v *VolcanoBatchScheduler) syncPodGroup(ctx context.Context, app *rayv1.RayCluster, size int32, totalResource corev1.ResourceList) error {
logger := ctrl.LoggerFrom(ctx).WithName(v.Name())
func getAppPodGroupName(object metav1.Object) string {
// Prefer the RayJob name if this object originated from a RayJob
name := object.GetName()
if labels := object.GetLabels(); labels != nil &&
labels[utils.RayOriginatedFromCRDLabelKey] == utils.RayOriginatedFromCRDLabelValue(utils.RayJobCRD) {
if rayJobName := labels[utils.RayOriginatedFromCRNameLabelKey]; rayJobName != "" {
name = rayJobName
}
}
return fmt.Sprintf("ray-%s-pg", name)
}

func addSchedulerName(obj metav1.Object, schedulerName string) {
switch obj := obj.(type) {
case *corev1.Pod:
obj.Spec.SchedulerName = schedulerName
case *corev1.PodTemplateSpec:
obj.Spec.SchedulerName = schedulerName
}
}

func populateAnnotations(parent metav1.Object, child metav1.Object, groupName string) {
annotations := child.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
}
annotations[volcanoschedulingv1beta1.KubeGroupNameAnnotationKey] = getAppPodGroupName(parent)
annotations[volcanobatchv1alpha1.TaskSpecKey] = groupName
child.SetAnnotations(annotations)
}

podGroupName := getAppPodGroupName(app)
podGroup := volcanov1beta1.PodGroup{}
if err := v.cli.Get(ctx, types.NamespacedName{Namespace: app.Namespace, Name: podGroupName}, &podGroup); err != nil {
func populateLabelsFromObject(parent metav1.Object, child metav1.Object, key string) {
labels := child.GetLabels()
if labels == nil {
labels = make(map[string]string)
}
if parentLabel, exist := parent.GetLabels()[key]; exist && parentLabel != "" {
labels[key] = parentLabel
}
child.SetLabels(labels)
}

// syncPodGroup ensures a Volcano PodGroup exists/updated for the given object
// with the provided size (MinMember) and total resources.
func (v *VolcanoBatchScheduler) syncPodGroup(ctx context.Context, owner metav1.Object, size int32, totalResource corev1.ResourceList) error {
logger := ctrl.LoggerFrom(ctx).WithName(pluginName)

podGroupName := getAppPodGroupName(owner)
podGroup := volcanoschedulingv1beta1.PodGroup{}
if err := v.cli.Get(ctx, types.NamespacedName{Namespace: owner.GetNamespace(), Name: podGroupName}, &podGroup); err != nil {
if !errors.IsNotFound(err) {
logger.Error(err, "failed to get PodGroup", "podGroupName", podGroupName, "ownerKind", utils.GetCRDType(owner.GetLabels()[utils.RayOriginatedFromCRDLabelKey]), "ownerName", owner.GetName(), "ownerNamespace", owner.GetNamespace())
Copy link

Copilot AI Oct 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential nil pointer dereference if owner.GetLabels() returns nil. The code should check if labels exist before accessing the map.

Copilot uses AI. Check for mistakes.
return err
}

podGroup := createPodGroup(app, podGroupName, size, totalResource)
podGroup := createPodGroup(owner, podGroupName, size, totalResource)
if err := v.cli.Create(ctx, &podGroup); err != nil {
if errors.IsAlreadyExists(err) {
logger.Info("pod group already exists, no need to create")
logger.Info("podGroup already exists, no need to create", "name", podGroupName)
return nil
}

logger.Error(err, "Pod group CREATE error!", "PodGroup.Error", err)
logger.Error(err, "failed to create PodGroup", "name", podGroupName, "ownerKind", utils.GetCRDType(owner.GetLabels()[utils.RayOriginatedFromCRDLabelKey]), "ownerName", owner.GetName(), "ownerNamespace", owner.GetNamespace())
Copy link

Copilot AI Oct 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential nil pointer dereference if owner.GetLabels() returns nil. The code should check if labels exist before accessing the map.

Copilot uses AI. Check for mistakes.
return err
}
} else {
if podGroup.Spec.MinMember != size || !quotav1.Equals(*podGroup.Spec.MinResources, totalResource) {
if podGroup.Spec.MinMember != size || podGroup.Spec.MinResources == nil || !quotav1.Equals(*podGroup.Spec.MinResources, totalResource) {
podGroup.Spec.MinMember = size
podGroup.Spec.MinResources = &totalResource
if err := v.cli.Update(ctx, &podGroup); err != nil {
logger.Error(err, "Pod group UPDATE error!", "podGroup", podGroupName)
logger.Error(err, "failed to update PodGroup", "name", podGroupName, "ownerKind", utils.GetCRDType(owner.GetLabels()[utils.RayOriginatedFromCRDLabelKey]), "ownerName", owner.GetName(), "ownerNamespace", owner.GetNamespace())
Copy link

Copilot AI Oct 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential nil pointer dereference if owner.GetLabels() returns nil. The code should check if labels exist before accessing the map.

Copilot uses AI. Check for mistakes.
return err
}
}
}
return nil
}

func createPodGroup(
app *rayv1.RayCluster,
podGroupName string,
size int32,
totalResource corev1.ResourceList,
) volcanov1beta1.PodGroup {
podGroup := volcanov1beta1.PodGroup{
func (v *VolcanoBatchScheduler) calculatePodGroupParams(ctx context.Context, rayClusterSpec *rayv1.RayClusterSpec) (int32, corev1.ResourceList) {
rayCluster := &rayv1.RayCluster{Spec: *rayClusterSpec}

if !utils.IsAutoscalingEnabled(rayClusterSpec) {
return utils.CalculateDesiredReplicas(ctx, rayCluster) + 1, utils.CalculateDesiredResources(rayCluster)
}
return utils.CalculateMinReplicas(rayCluster) + 1, utils.CalculateMinResources(rayCluster)
}

func createPodGroup(owner metav1.Object, podGroupName string, size int32, totalResource corev1.ResourceList) volcanoschedulingv1beta1.PodGroup {
var ownerRef metav1.OwnerReference
switch obj := owner.(type) {
case *rayv1.RayCluster:
ownerRef = *metav1.NewControllerRef(obj, rayv1.SchemeGroupVersion.WithKind("RayCluster"))
case *rayv1.RayJob:
ownerRef = *metav1.NewControllerRef(obj, rayv1.SchemeGroupVersion.WithKind("RayJob"))
}

podGroup := volcanoschedulingv1beta1.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Namespace: app.Namespace,
Name: podGroupName,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(app, rayv1.SchemeGroupVersion.WithKind("RayCluster")),
},
Namespace: owner.GetNamespace(),
Name: podGroupName,
OwnerReferences: []metav1.OwnerReference{ownerRef},
},
Spec: volcanov1beta1.PodGroupSpec{
Spec: volcanoschedulingv1beta1.PodGroupSpec{
MinMember: size,
MinResources: &totalResource,
},
Status: volcanov1beta1.PodGroupStatus{
Phase: volcanov1beta1.PodGroupPending,
Status: volcanoschedulingv1beta1.PodGroupStatus{
Phase: volcanoschedulingv1beta1.PodGroupPending,
},
}

if queue, ok := app.ObjectMeta.Labels[QueueNameLabelKey]; ok {
if queue, ok := owner.GetLabels()[QueueNameLabelKey]; ok {
podGroup.Spec.Queue = queue
}

if priorityClassName, ok := app.ObjectMeta.Labels[utils.RayPriorityClassName]; ok {
if priorityClassName, ok := owner.GetLabels()[utils.RayPriorityClassName]; ok {
podGroup.Spec.PriorityClassName = priorityClassName
}

return podGroup
}

func (v *VolcanoBatchScheduler) AddMetadataToChildResource(_ context.Context, parent metav1.Object, child metav1.Object, groupName string) {
populateLabelsFromObject(parent, child, QueueNameLabelKey)
populateLabelsFromObject(parent, child, utils.RayPriorityClassName)
populateAnnotations(parent, child, groupName)
addSchedulerName(child, v.Name())
}

// This function will be removed in interface migration PR
func (v *VolcanoBatchScheduler) AddMetadataToPod(_ context.Context, app *rayv1.RayCluster, groupName string, pod *corev1.Pod) {
pod.Annotations[volcanov1beta1.KubeGroupNameAnnotationKey] = getAppPodGroupName(app)
pod.Annotations[volcanov1alpha1.TaskSpecKey] = groupName
pod.Annotations[volcanoschedulingv1beta1.KubeGroupNameAnnotationKey] = getAppPodGroupName(app)
pod.Annotations[volcanobatchv1alpha1.TaskSpecKey] = groupName
if queue, ok := app.ObjectMeta.Labels[QueueNameLabelKey]; ok {
pod.Labels[QueueNameLabelKey] = queue
}
Expand All @@ -143,11 +228,8 @@ func (v *VolcanoBatchScheduler) AddMetadataToPod(_ context.Context, app *rayv1.R
pod.Spec.SchedulerName = v.Name()
}

func (v *VolcanoBatchScheduler) AddMetadataToChildResource(_ context.Context, _ metav1.Object, _ metav1.Object, _ string) {
}

func (vf *VolcanoBatchSchedulerFactory) New(_ context.Context, _ *rest.Config, cli client.Client) (schedulerinterface.BatchScheduler, error) {
if err := volcanov1beta1.AddToScheme(cli.Scheme()); err != nil {
if err := volcanoschedulingv1beta1.AddToScheme(cli.Scheme()); err != nil {
return nil, fmt.Errorf("failed to add volcano to scheme with error %w", err)
}
return &VolcanoBatchScheduler{
Expand All @@ -156,9 +238,9 @@ func (vf *VolcanoBatchSchedulerFactory) New(_ context.Context, _ *rest.Config, c
}

func (vf *VolcanoBatchSchedulerFactory) AddToScheme(scheme *runtime.Scheme) {
utilruntime.Must(volcanov1beta1.AddToScheme(scheme))
utilruntime.Must(volcanoschedulingv1beta1.AddToScheme(scheme))
}

func (vf *VolcanoBatchSchedulerFactory) ConfigureReconciler(b *builder.Builder) *builder.Builder {
return b.Owns(&volcanov1beta1.PodGroup{})
return b.Owns(&volcanoschedulingv1beta1.PodGroup{})
}
Loading
Loading