diff --git a/ray-operator/controllers/ray/common/job.go b/ray-operator/controllers/ray/common/job.go index 05025a3e86e..0d3735b22c7 100644 --- a/ray-operator/controllers/ray/common/job.go +++ b/ray-operator/controllers/ray/common/job.go @@ -14,6 +14,7 @@ import ( rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" + "github.com/ray-project/kuberay/ray-operator/pkg/features" pkgutils "github.com/ray-project/kuberay/ray-operator/pkg/utils" ) @@ -141,10 +142,13 @@ func BuildJobSubmitCommand(rayJobInstance *rayv1.RayJob, submissionMode rayv1.Jo cmd = append(cmd, waitLoop...) } - // In Sidecar mode, we only support RayJob level retry, which means that the submitter retry won't happen, + // In Sidecar mode without SidecarSubmitterRestart feature gate enabled, we only support RayJob level retry, which means that the submitter retry won't happen, // so we won't have to check if the job has been submitted. - if submissionMode == rayv1.K8sJobMode { - // Only check job status in K8s mode to handle duplicated submission gracefully + // In K8sJobMode (submitter Job may retry) or Sidecar mode with SidecarSubmitterRestart feature gate enabled (container may restart on failure). + // we check job status before submitting to handle duplicated submission gracefully. + needsStatusCheck := submissionMode == rayv1.K8sJobMode || (submissionMode == rayv1.SidecarMode && features.Enabled(features.SidecarSubmitterRestart)) + + if needsStatusCheck { cmd = append(cmd, "if", "!") cmd = append(cmd, jobStatusCommand...) cmd = append(cmd, ";", "then") @@ -152,7 +156,7 @@ func BuildJobSubmitCommand(rayJobInstance *rayv1.RayJob, submissionMode rayv1.Jo cmd = append(cmd, jobSubmitCommand...) - if submissionMode == rayv1.K8sJobMode { + if needsStatusCheck { cmd = append(cmd, "--no-wait") } @@ -190,7 +194,7 @@ func BuildJobSubmitCommand(rayJobInstance *rayv1.RayJob, submissionMode rayv1.Jo // "--" is used to separate the entrypoint from the Ray Job CLI command and its arguments. cmd = append(cmd, "--", entrypoint, ";") - if submissionMode == rayv1.K8sJobMode { + if needsStatusCheck { cmd = append(cmd, "fi", ";") cmd = append(cmd, jobFollowCommand...) } diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index b5492f93115..a18798e9767 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -272,14 +272,17 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) } var finishedAt *time.Time - if rayJobInstance.Spec.SubmissionMode == rayv1.K8sJobMode || rayJobInstance.Spec.SubmissionMode == rayv1.SidecarMode { + if rayJobInstance.Spec.SubmissionMode == rayv1.K8sJobMode || + rayJobInstance.Spec.SubmissionMode == rayv1.SidecarMode { var shouldUpdate bool shouldUpdate, finishedAt, err = r.checkSubmitterAndUpdateStatusIfNeeded(ctx, rayJobInstance) if err != nil { return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err } - if checkSubmitterFinishedTimeoutAndUpdateStatusIfNeeded(ctx, rayJobInstance, finishedAt) { + // For K8sJobMode, check timeout before dashboard check + if rayJobInstance.Spec.SubmissionMode == rayv1.K8sJobMode && + checkSubmitterFinishedTimeoutAndUpdateStatusIfNeeded(ctx, rayJobInstance, finishedAt) { break } @@ -318,6 +321,13 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) break } } + // If in Sidecar mode, apply timeout as fallback if dashboard is unreachable and submitter has finished + if rayJobInstance.Spec.SubmissionMode == rayv1.SidecarMode { + if checkSubmitterFinishedTimeoutAndUpdateStatusIfNeeded(ctx, rayJobInstance, finishedAt) { + // rayJobInstance.Status marked to Failed + break + } + } logger.Error(err, "Failed to get job info", "JobId", rayJobInstance.Status.JobId) return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err @@ -335,6 +345,12 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) isJobTerminal = isJobTerminal && finishedAt != nil } + // inform the user when submitter exited (finishedAt != nil) but job is still running (!isJobTerminal). + if rayJobInstance.Spec.SubmissionMode == rayv1.SidecarMode && finishedAt != nil && !isJobTerminal { + logger.Info("Submitter container exited but Ray job is still running.", + "JobId", rayJobInstance.Status.JobId, "JobStatus", jobInfo.JobStatus) + } + if isJobTerminal { jobDeploymentStatus = rayv1.JobDeploymentStatusComplete if jobInfo.JobStatus == rayv1.JobStatusFailed { @@ -586,6 +602,23 @@ func getSubmitterContainer(rayJobInstance *rayv1.RayJob, rayClusterInstance *ray return corev1.Container{}, err } + // When SidecarSubmitterRestart feature gate is enabled, configure per-container restart rules. + // This requires Kubernetes 1.34+ with ContainerRestartRules feature gate enabled. + if features.Enabled(features.SidecarSubmitterRestart) { + submitterContainer.RestartPolicy = ptr.To(corev1.ContainerRestartPolicyNever) + submitterContainer.RestartPolicyRules = []corev1.ContainerRestartRule{ + { + Action: corev1.ContainerRestartRuleActionRestart, + ExitCodes: &corev1.ContainerRestartRuleOnExitCodes{ + // Restart on any non-zero exit code (transient failures) + // TODO check if it is non-zero or a specific error code again + Operator: corev1.ContainerRestartRuleOnExitCodesOpNotIn, + Values: []int32{0}, + }, + }, + } + } + return submitterContainer, nil } @@ -1015,7 +1048,6 @@ func (r *RayJobReconciler) checkSubmitterAndUpdateStatusIfNeeded(ctx context.Con logger := ctrl.LoggerFrom(ctx) shouldUpdate = false finishedAt = nil - var submitterContainerStatus *corev1.ContainerStatus var condition *batchv1.JobCondition switch rayJob.Spec.SubmissionMode { @@ -1044,23 +1076,6 @@ func (r *RayJobReconciler) checkSubmitterAndUpdateStatusIfNeeded(ctx context.Con return } - shouldUpdate, submitterContainerStatus = checkSidecarContainerStatus(headPod) - if shouldUpdate { - logger.Info("The submitter sidecar container has failed. Attempting to transition the status to `Failed`.", - "Submitter sidecar container", submitterContainerStatus.Name, "Reason", submitterContainerStatus.State.Terminated.Reason, "Message", submitterContainerStatus.State.Terminated.Message) - rayJob.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusFailed - // The submitter sidecar container needs to wait for the user code to finish and retrieve its logs. - // Therefore, a failed Submitter sidecar container indicates that the submission itself has failed or the user code has thrown an error. - // If the failure is due to user code, the JobStatus and Job message will be updated accordingly from the previous reconciliation. - if rayJob.Status.JobStatus == rayv1.JobStatusFailed { - rayJob.Status.Reason = rayv1.AppFailed - } else { - rayJob.Status.Reason = rayv1.SubmissionFailed - rayJob.Status.Message = fmt.Sprintf("Ray head pod container %s terminated with exit code %d: %s", - submitterContainerStatus.Name, submitterContainerStatus.State.Terminated.ExitCode, submitterContainerStatus.State.Terminated.Reason) - } - } - finishedAt = getSubmitterContainerFinishedTime(headPod) return case rayv1.K8sJobMode: @@ -1126,21 +1141,6 @@ func getJobFinishedCondition(job *batchv1.Job) *batchv1.JobCondition { return nil } -func checkSidecarContainerStatus(headPod *corev1.Pod) (bool, *corev1.ContainerStatus) { - for _, containerStatus := range headPod.Status.ContainerStatuses { - if containerStatus.Name == utils.SubmitterContainerName { - // Check for terminated containers with error exit codes - // Based on the document, "ray job submit" will exit with 0 if the job succeeded, or exit with 1 if it failed. - // https://docs.ray.io/en/latest/cluster/running-applications/job-submission/cli.html#ray-job-submit - if containerStatus.State.Terminated != nil && containerStatus.State.Terminated.ExitCode != 0 { - return true, &containerStatus - } - break - } - } - return false, nil -} - func checkActiveDeadlineAndUpdateStatusIfNeeded(ctx context.Context, rayJob *rayv1.RayJob) bool { logger := ctrl.LoggerFrom(ctx) if rayJob.Spec.ActiveDeadlineSeconds == nil || time.Now().Before(rayJob.Status.StartTime.Add(time.Duration(*rayJob.Spec.ActiveDeadlineSeconds)*time.Second)) { diff --git a/ray-operator/controllers/ray/rayjob_controller_unit_test.go b/ray-operator/controllers/ray/rayjob_controller_unit_test.go index eb37b1a729a..14e8d61c407 100644 --- a/ray-operator/controllers/ray/rayjob_controller_unit_test.go +++ b/ray-operator/controllers/ray/rayjob_controller_unit_test.go @@ -27,6 +27,7 @@ import ( "github.com/ray-project/kuberay/ray-operator/controllers/ray/metrics/mocks" utils "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" "github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned/scheme" + "github.com/ray-project/kuberay/ray-operator/pkg/features" ) func TestCreateRayJobSubmitterIfNeed(t *testing.T) { @@ -201,6 +202,106 @@ func TestGetSubmitterTemplate(t *testing.T) { assert.Equal(t, "test-job-id", envVar.Value) } +func TestGetSubmitterContainerWithFeatureGate(t *testing.T) { + // Enable the SidecarSubmitterRestart feature gate for this test + features.SetFeatureGateDuringTest(t, features.SidecarSubmitterRestart, true) + + rayJobInstance := &rayv1.RayJob{ + Spec: rayv1.RayJobSpec{ + Entrypoint: "echo test", + SubmissionMode: rayv1.SidecarMode, + RayClusterSpec: &rayv1.RayClusterSpec{ + HeadGroupSpec: rayv1.HeadGroupSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Image: "rayproject/ray:test", + Ports: []corev1.ContainerPort{ + { + Name: utils.DashboardPortName, + ContainerPort: utils.DefaultDashboardPort, + }, + }, + }, + }, + }, + }, + }, + }, + }, + Status: rayv1.RayJobStatus{ + DashboardURL: "http://127.0.0.1:8265", + JobId: "test-job-id", + }, + } + + rayClusterInstance := &rayv1.RayCluster{ + Spec: *rayJobInstance.Spec.RayClusterSpec, + } + + container, err := getSubmitterContainer(rayJobInstance, rayClusterInstance) + require.NoError(t, err) + + // Verify restart policy is set to Never + require.NotNil(t, container.RestartPolicy) + assert.Equal(t, corev1.ContainerRestartPolicyNever, *container.RestartPolicy) + + // Verify restart policy rules are set + require.Len(t, container.RestartPolicyRules, 1) + rule := container.RestartPolicyRules[0] + assert.Equal(t, corev1.ContainerRestartRuleActionRestart, rule.Action) + require.NotNil(t, rule.ExitCodes) + assert.Equal(t, corev1.ContainerRestartRuleOnExitCodesOpNotIn, rule.ExitCodes.Operator) + assert.Equal(t, []int32{0}, rule.ExitCodes.Values) +} + +func TestGetSubmitterContainerWithoutFeatureGate(t *testing.T) { + // Explicitly disable the feature gate + features.SetFeatureGateDuringTest(t, features.SidecarSubmitterRestart, false) + + rayJobInstance := &rayv1.RayJob{ + Spec: rayv1.RayJobSpec{ + Entrypoint: "echo test", + SubmissionMode: rayv1.SidecarMode, + RayClusterSpec: &rayv1.RayClusterSpec{ + HeadGroupSpec: rayv1.HeadGroupSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Image: "rayproject/ray:test", + Ports: []corev1.ContainerPort{ + { + Name: utils.DashboardPortName, + ContainerPort: utils.DefaultDashboardPort, + }, + }, + }, + }, + }, + }, + }, + }, + }, + Status: rayv1.RayJobStatus{ + DashboardURL: "http://127.0.0.1:8265", + JobId: "test-job-id", + }, + } + + rayClusterInstance := &rayv1.RayCluster{ + Spec: *rayJobInstance.Spec.RayClusterSpec, + } + + container, err := getSubmitterContainer(rayJobInstance, rayClusterInstance) + require.NoError(t, err) + + // Verify restart policy is NOT set (nil) when feature gate is disabled + assert.Nil(t, container.RestartPolicy) + assert.Empty(t, container.RestartPolicyRules) +} + func TestUpdateStatusToSuspendingIfNeeded(t *testing.T) { newScheme := runtime.NewScheme() _ = rayv1.AddToScheme(newScheme) diff --git a/ray-operator/controllers/ray/utils/util.go b/ray-operator/controllers/ray/utils/util.go index 072e468ca81..1771265910f 100644 --- a/ray-operator/controllers/ray/utils/util.go +++ b/ray-operator/controllers/ray/utils/util.go @@ -23,6 +23,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/apimachinery/pkg/version" "k8s.io/client-go/discovery" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" @@ -881,6 +882,34 @@ func GetClusterType() bool { return false } +func GetKubernetesVersion() (*version.Info, error) { + config, err := ctrl.GetConfig() + if err != nil { + return nil, err + } + + discoveryClient, err := discovery.NewDiscoveryClientForConfig(config) + if err != nil { + return nil, err + } + + serverVersion, err := discoveryClient.ServerVersion() + if err != nil { + return nil, err + } + return serverVersion, nil +} + +func IsK8sVersionAtLeast(serverVersion *version.Info, major, minor int) bool { + majorVersion, _ := strconv.Atoi(serverVersion.Major) + // Minor can have "+" suffix (e.g., "34+"), need to trim it + minorVersion, _ := strconv.Atoi(strings.TrimSuffix(serverVersion.Minor, "+")) + if majorVersion < major || (majorVersion == major && minorVersion < minor) { + return false + } + return true +} + func GetContainerCommand(additionalOptions []string) []string { bashOptions := []string{"c"} bashOptions = append(bashOptions, additionalOptions...) diff --git a/ray-operator/main.go b/ray-operator/main.go index d06fddc8a7d..e0e81b734a2 100644 --- a/ray-operator/main.go +++ b/ray-operator/main.go @@ -196,6 +196,17 @@ func main() { utilruntime.Must(gwv1.AddToScheme(scheme)) } + // Fail if SidecarSubmitterRestart is enabled but K8s < 1.34 + if features.Enabled(features.SidecarSubmitterRestart) { + serverVersion, err := utils.GetKubernetesVersion() + if err != nil { + exitOnError(err, "SidecarSubmitterRestart feature gate enabled but unable to detect K8s version. Feature requires K8s 1.34+.") + } else if !utils.IsK8sVersionAtLeast(serverVersion, 1, 34) { + exitOnError(fmt.Errorf("current version %s is below 1.34", serverVersion.GitVersion), + "SidecarSubmitterRestart feature gate requires K8s 1.34+") + } + } + // Manager options options := ctrl.Options{ Cache: cache.Options{ diff --git a/ray-operator/pkg/features/features.go b/ray-operator/pkg/features/features.go index 6a9bb622fd5..f14f28edcbf 100644 --- a/ray-operator/pkg/features/features.go +++ b/ray-operator/pkg/features/features.go @@ -53,6 +53,14 @@ const ( // // Enables asynchronous job info querying. AsyncJobInfoQuery featuregate.Feature = "AsyncJobInfoQuery" + + // owner: @justinyeh1995 + // rep: N/A + // alpha: v1.6 + // + // Enables per-container restart policy for SidecarMode submitter to handle transient failures. + // Requires Kubernetes 1.34+ with ContainerRestartRules feature gate enabled. + SidecarSubmitterRestart featuregate.Feature = "SidecarSubmitterRestart" ) func init() { @@ -66,6 +74,7 @@ var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ RayServiceIncrementalUpgrade: {Default: false, PreRelease: featuregate.Alpha}, RayCronJob: {Default: false, PreRelease: featuregate.Alpha}, AsyncJobInfoQuery: {Default: false, PreRelease: featuregate.Alpha}, + SidecarSubmitterRestart: {Default: false, PreRelease: featuregate.Alpha}, } // SetFeatureGateDuringTest is a helper method to override feature gates in tests.