Skip to content
Open
14 changes: 9 additions & 5 deletions ray-operator/controllers/ray/common/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
"github.com/ray-project/kuberay/ray-operator/pkg/features"
pkgutils "github.com/ray-project/kuberay/ray-operator/pkg/utils"
)

Expand Down Expand Up @@ -141,18 +142,21 @@ func BuildJobSubmitCommand(rayJobInstance *rayv1.RayJob, submissionMode rayv1.Jo
cmd = append(cmd, waitLoop...)
}

// In Sidecar mode, we only support RayJob level retry, which means that the submitter retry won't happen,
// In Sidecar mode without SidecarSubmitterRestart feature gate enabled, we only support RayJob level retry, which means that the submitter retry won't happen,
// so we won't have to check if the job has been submitted.
if submissionMode == rayv1.K8sJobMode {
// Only check job status in K8s mode to handle duplicated submission gracefully
// In K8sJobMode (submitter Job may retry) or Sidecar mode with SidecarSubmitterRestart feature gate enabled (container may restart on failure).
// we check job status before submitting to handle duplicated submission gracefully.
needsStatusCheck := submissionMode == rayv1.K8sJobMode || (submissionMode == rayv1.SidecarMode && features.Enabled(features.SidecarSubmitterRestart))

if needsStatusCheck {
cmd = append(cmd, "if", "!")
cmd = append(cmd, jobStatusCommand...)
cmd = append(cmd, ";", "then")
}

cmd = append(cmd, jobSubmitCommand...)

if submissionMode == rayv1.K8sJobMode {
if needsStatusCheck {
cmd = append(cmd, "--no-wait")
}

Expand Down Expand Up @@ -190,7 +194,7 @@ func BuildJobSubmitCommand(rayJobInstance *rayv1.RayJob, submissionMode rayv1.Jo

// "--" is used to separate the entrypoint from the Ray Job CLI command and its arguments.
cmd = append(cmd, "--", entrypoint, ";")
if submissionMode == rayv1.K8sJobMode {
if needsStatusCheck {
cmd = append(cmd, "fi", ";")
cmd = append(cmd, jobFollowCommand...)
}
Expand Down
70 changes: 35 additions & 35 deletions ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,14 +272,17 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
}

var finishedAt *time.Time
if rayJobInstance.Spec.SubmissionMode == rayv1.K8sJobMode || rayJobInstance.Spec.SubmissionMode == rayv1.SidecarMode {
if rayJobInstance.Spec.SubmissionMode == rayv1.K8sJobMode ||
rayJobInstance.Spec.SubmissionMode == rayv1.SidecarMode {
var shouldUpdate bool
shouldUpdate, finishedAt, err = r.checkSubmitterAndUpdateStatusIfNeeded(ctx, rayJobInstance)
if err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}

if checkSubmitterFinishedTimeoutAndUpdateStatusIfNeeded(ctx, rayJobInstance, finishedAt) {
// For K8sJobMode, check timeout before dashboard check
if rayJobInstance.Spec.SubmissionMode == rayv1.K8sJobMode &&
checkSubmitterFinishedTimeoutAndUpdateStatusIfNeeded(ctx, rayJobInstance, finishedAt) {
break
}

Expand Down Expand Up @@ -318,6 +321,13 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
break
}
}
// If in Sidecar mode, apply timeout as fallback if dashboard is unreachable and submitter has finished
if rayJobInstance.Spec.SubmissionMode == rayv1.SidecarMode {
if checkSubmitterFinishedTimeoutAndUpdateStatusIfNeeded(ctx, rayJobInstance, finishedAt) {
// rayJobInstance.Status marked to Failed
break
}
}

logger.Error(err, "Failed to get job info", "JobId", rayJobInstance.Status.JobId)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
Expand All @@ -335,6 +345,12 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
isJobTerminal = isJobTerminal && finishedAt != nil
}

// inform the user when submitter exited (finishedAt != nil) but job is still running (!isJobTerminal).
if rayJobInstance.Spec.SubmissionMode == rayv1.SidecarMode && finishedAt != nil && !isJobTerminal {
logger.Info("Submitter container exited but Ray job is still running.",
"JobId", rayJobInstance.Status.JobId, "JobStatus", jobInfo.JobStatus)
}

if isJobTerminal {
jobDeploymentStatus = rayv1.JobDeploymentStatusComplete
if jobInfo.JobStatus == rayv1.JobStatusFailed {
Expand Down Expand Up @@ -586,6 +602,23 @@ func getSubmitterContainer(rayJobInstance *rayv1.RayJob, rayClusterInstance *ray
return corev1.Container{}, err
}

// When SidecarSubmitterRestart feature gate is enabled, configure per-container restart rules.
// This requires Kubernetes 1.34+ with ContainerRestartRules feature gate enabled.
if features.Enabled(features.SidecarSubmitterRestart) {
submitterContainer.RestartPolicy = ptr.To(corev1.ContainerRestartPolicyNever)
submitterContainer.RestartPolicyRules = []corev1.ContainerRestartRule{
{
Action: corev1.ContainerRestartRuleActionRestart,
ExitCodes: &corev1.ContainerRestartRuleOnExitCodes{
// Restart on any non-zero exit code (transient failures)
// TODO check if it is non-zero or a specific error code again
Operator: corev1.ContainerRestartRuleOnExitCodesOpNotIn,
Values: []int32{0},
Comment on lines +615 to +616
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it ok if we restart on any non-zero error code?

},
},
}
}

return submitterContainer, nil
}

Expand Down Expand Up @@ -1015,7 +1048,6 @@ func (r *RayJobReconciler) checkSubmitterAndUpdateStatusIfNeeded(ctx context.Con
logger := ctrl.LoggerFrom(ctx)
shouldUpdate = false
finishedAt = nil
var submitterContainerStatus *corev1.ContainerStatus
var condition *batchv1.JobCondition

switch rayJob.Spec.SubmissionMode {
Expand Down Expand Up @@ -1044,23 +1076,6 @@ func (r *RayJobReconciler) checkSubmitterAndUpdateStatusIfNeeded(ctx context.Con
return
}

shouldUpdate, submitterContainerStatus = checkSidecarContainerStatus(headPod)
if shouldUpdate {
logger.Info("The submitter sidecar container has failed. Attempting to transition the status to `Failed`.",
"Submitter sidecar container", submitterContainerStatus.Name, "Reason", submitterContainerStatus.State.Terminated.Reason, "Message", submitterContainerStatus.State.Terminated.Message)
rayJob.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusFailed
// The submitter sidecar container needs to wait for the user code to finish and retrieve its logs.
// Therefore, a failed Submitter sidecar container indicates that the submission itself has failed or the user code has thrown an error.
// If the failure is due to user code, the JobStatus and Job message will be updated accordingly from the previous reconciliation.
if rayJob.Status.JobStatus == rayv1.JobStatusFailed {
rayJob.Status.Reason = rayv1.AppFailed
} else {
rayJob.Status.Reason = rayv1.SubmissionFailed
rayJob.Status.Message = fmt.Sprintf("Ray head pod container %s terminated with exit code %d: %s",
submitterContainerStatus.Name, submitterContainerStatus.State.Terminated.ExitCode, submitterContainerStatus.State.Terminated.Reason)
}
}

finishedAt = getSubmitterContainerFinishedTime(headPod)
return
case rayv1.K8sJobMode:
Expand Down Expand Up @@ -1126,21 +1141,6 @@ func getJobFinishedCondition(job *batchv1.Job) *batchv1.JobCondition {
return nil
}

func checkSidecarContainerStatus(headPod *corev1.Pod) (bool, *corev1.ContainerStatus) {
for _, containerStatus := range headPod.Status.ContainerStatuses {
if containerStatus.Name == utils.SubmitterContainerName {
// Check for terminated containers with error exit codes
// Based on the document, "ray job submit" will exit with 0 if the job succeeded, or exit with 1 if it failed.
// https://docs.ray.io/en/latest/cluster/running-applications/job-submission/cli.html#ray-job-submit
if containerStatus.State.Terminated != nil && containerStatus.State.Terminated.ExitCode != 0 {
return true, &containerStatus
}
break
}
}
return false, nil
}

func checkActiveDeadlineAndUpdateStatusIfNeeded(ctx context.Context, rayJob *rayv1.RayJob) bool {
logger := ctrl.LoggerFrom(ctx)
if rayJob.Spec.ActiveDeadlineSeconds == nil || time.Now().Before(rayJob.Status.StartTime.Add(time.Duration(*rayJob.Spec.ActiveDeadlineSeconds)*time.Second)) {
Expand Down
101 changes: 101 additions & 0 deletions ray-operator/controllers/ray/rayjob_controller_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/ray-project/kuberay/ray-operator/controllers/ray/metrics/mocks"
utils "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
"github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned/scheme"
"github.com/ray-project/kuberay/ray-operator/pkg/features"
)

func TestCreateRayJobSubmitterIfNeed(t *testing.T) {
Expand Down Expand Up @@ -201,6 +202,106 @@ func TestGetSubmitterTemplate(t *testing.T) {
assert.Equal(t, "test-job-id", envVar.Value)
}

func TestGetSubmitterContainerWithFeatureGate(t *testing.T) {
// Enable the SidecarSubmitterRestart feature gate for this test
features.SetFeatureGateDuringTest(t, features.SidecarSubmitterRestart, true)

rayJobInstance := &rayv1.RayJob{
Spec: rayv1.RayJobSpec{
Entrypoint: "echo test",
SubmissionMode: rayv1.SidecarMode,
RayClusterSpec: &rayv1.RayClusterSpec{
HeadGroupSpec: rayv1.HeadGroupSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Image: "rayproject/ray:test",
Ports: []corev1.ContainerPort{
{
Name: utils.DashboardPortName,
ContainerPort: utils.DefaultDashboardPort,
},
},
},
},
},
},
},
},
},
Status: rayv1.RayJobStatus{
DashboardURL: "http://127.0.0.1:8265",
JobId: "test-job-id",
},
}

rayClusterInstance := &rayv1.RayCluster{
Spec: *rayJobInstance.Spec.RayClusterSpec,
}

container, err := getSubmitterContainer(rayJobInstance, rayClusterInstance)
require.NoError(t, err)

// Verify restart policy is set to Never
require.NotNil(t, container.RestartPolicy)
assert.Equal(t, corev1.ContainerRestartPolicyNever, *container.RestartPolicy)

// Verify restart policy rules are set
require.Len(t, container.RestartPolicyRules, 1)
rule := container.RestartPolicyRules[0]
assert.Equal(t, corev1.ContainerRestartRuleActionRestart, rule.Action)
require.NotNil(t, rule.ExitCodes)
assert.Equal(t, corev1.ContainerRestartRuleOnExitCodesOpNotIn, rule.ExitCodes.Operator)
assert.Equal(t, []int32{0}, rule.ExitCodes.Values)
}

func TestGetSubmitterContainerWithoutFeatureGate(t *testing.T) {
// Explicitly disable the feature gate
features.SetFeatureGateDuringTest(t, features.SidecarSubmitterRestart, false)

rayJobInstance := &rayv1.RayJob{
Spec: rayv1.RayJobSpec{
Entrypoint: "echo test",
SubmissionMode: rayv1.SidecarMode,
RayClusterSpec: &rayv1.RayClusterSpec{
HeadGroupSpec: rayv1.HeadGroupSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Image: "rayproject/ray:test",
Ports: []corev1.ContainerPort{
{
Name: utils.DashboardPortName,
ContainerPort: utils.DefaultDashboardPort,
},
},
},
},
},
},
},
},
},
Status: rayv1.RayJobStatus{
DashboardURL: "http://127.0.0.1:8265",
JobId: "test-job-id",
},
}

rayClusterInstance := &rayv1.RayCluster{
Spec: *rayJobInstance.Spec.RayClusterSpec,
}

container, err := getSubmitterContainer(rayJobInstance, rayClusterInstance)
require.NoError(t, err)

// Verify restart policy is NOT set (nil) when feature gate is disabled
assert.Nil(t, container.RestartPolicy)
assert.Empty(t, container.RestartPolicyRules)
}

func TestUpdateStatusToSuspendingIfNeeded(t *testing.T) {
newScheme := runtime.NewScheme()
_ = rayv1.AddToScheme(newScheme)
Expand Down
29 changes: 29 additions & 0 deletions ray-operator/controllers/ray/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/version"
"k8s.io/client-go/discovery"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -881,6 +882,34 @@ func GetClusterType() bool {
return false
}

func GetKubernetesVersion() (*version.Info, error) {
config, err := ctrl.GetConfig()
if err != nil {
return nil, err
}

discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
return nil, err
}

serverVersion, err := discoveryClient.ServerVersion()
if err != nil {
return nil, err
}
return serverVersion, nil
}

func IsK8sVersionAtLeast(serverVersion *version.Info, major, minor int) bool {
majorVersion, _ := strconv.Atoi(serverVersion.Major)
// Minor can have "+" suffix (e.g., "34+"), need to trim it
minorVersion, _ := strconv.Atoi(strings.TrimSuffix(serverVersion.Minor, "+"))
if majorVersion < major || (majorVersion == major && minorVersion < minor) {
return false
}
return true
}

func GetContainerCommand(additionalOptions []string) []string {
bashOptions := []string{"c"}
bashOptions = append(bashOptions, additionalOptions...)
Expand Down
11 changes: 11 additions & 0 deletions ray-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,17 @@ func main() {
utilruntime.Must(gwv1.AddToScheme(scheme))
}

// Fail if SidecarSubmitterRestart is enabled but K8s < 1.34
if features.Enabled(features.SidecarSubmitterRestart) {
serverVersion, err := utils.GetKubernetesVersion()
if err != nil {
exitOnError(err, "SidecarSubmitterRestart feature gate enabled but unable to detect K8s version. Feature requires K8s 1.34+.")
} else if !utils.IsK8sVersionAtLeast(serverVersion, 1, 34) {
exitOnError(fmt.Errorf("current version %s is below 1.34", serverVersion.GitVersion),
"SidecarSubmitterRestart feature gate requires K8s 1.34+")
}
}

// Manager options
options := ctrl.Options{
Cache: cache.Options{
Expand Down
9 changes: 9 additions & 0 deletions ray-operator/pkg/features/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ const (
//
// Enables asynchronous job info querying.
AsyncJobInfoQuery featuregate.Feature = "AsyncJobInfoQuery"

// owner: @justinyeh1995
// rep: N/A
// alpha: v1.6
//
// Enables per-container restart policy for SidecarMode submitter to handle transient failures.
// Requires Kubernetes 1.34+ with ContainerRestartRules feature gate enabled.
SidecarSubmitterRestart featuregate.Feature = "SidecarSubmitterRestart"
)

func init() {
Expand All @@ -66,6 +74,7 @@ var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
RayServiceIncrementalUpgrade: {Default: false, PreRelease: featuregate.Alpha},
RayCronJob: {Default: false, PreRelease: featuregate.Alpha},
AsyncJobInfoQuery: {Default: false, PreRelease: featuregate.Alpha},
SidecarSubmitterRestart: {Default: false, PreRelease: featuregate.Alpha},
}

// SetFeatureGateDuringTest is a helper method to override feature gates in tests.
Expand Down
Loading