diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index c6832920357..048d5b33758 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -152,8 +152,8 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) rayJobInstance.Status.Reason = rayv1.ValidationFailed rayJobInstance.Status.Message = err.Error() - // This is the only 2 places where we update the RayJob status. This will directly - // update the JobDeploymentStatus to ValidationFailed if there's validation error + // This is one of the only 3 places where we update the RayJob status. This will directly + // update the JobDeploymentStatus to ValidationFailed if there's validation error. if err = r.updateRayJobStatus(ctx, originalRayJobInstance, rayJobInstance); err != nil { logger.Info("Failed to update RayJob status", "error", err) return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err @@ -204,6 +204,12 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) if clientURL := rayJobInstance.Status.DashboardURL; clientURL == "" { if rayClusterInstance.Status.State != rayv1.Ready { logger.Info("Wait for the RayCluster.Status.State to be ready before submitting the job.", "RayCluster", rayClusterInstance.Name, "State", rayClusterInstance.Status.State) + // This is one of only 3 places where we update the RayJob status. For observability + // while waiting for the RayCluster to become ready, we lift the cluster status. + rayJobInstance.Status.RayClusterStatus = rayClusterInstance.Status + if err = r.updateRayJobStatus(ctx, originalRayJobInstance, rayJobInstance); err != nil { + logger.Info("Failed to update RayJob status", "error", err) + } return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err } @@ -419,8 +425,8 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) } checkBackoffLimitAndUpdateStatusIfNeeded(ctx, rayJobInstance) - // This is the only 2 places where we update the RayJob status. Please do NOT add any code - // between `checkBackoffLimitAndUpdateStatusIfNeeded` and the following code. + // This is one of the only 3 places where we update the RayJob status. Please do NOT add any + // code between `checkBackoffLimitAndUpdateStatusIfNeeded` and the following code. if err = r.updateRayJobStatus(ctx, originalRayJobInstance, rayJobInstance); err != nil { logger.Info("Failed to update RayJob status", "error", err) return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err diff --git a/ray-operator/controllers/ray/rayjob_controller_test.go b/ray-operator/controllers/ray/rayjob_controller_test.go index b9c168400fe..e8ac2be20e3 100644 --- a/ray-operator/controllers/ray/rayjob_controller_test.go +++ b/ray-operator/controllers/ray/rayjob_controller_test.go @@ -241,6 +241,34 @@ var _ = Context("RayJob with different submission modes", func() { Expect(rayCluster.Annotations).Should(Equal(rayJob.Annotations)) }) + It("In Initializing state, the JobStatus should show the RayCluster status", func() { + // The RayCluster is not 'Ready' yet because Pods are not running and ready. + Expect(rayCluster.Status.State).NotTo(Equal(rayv1.Ready)) + + updateHeadPodToRunningNotReady(ctx, rayJob.Status.RayClusterName, namespace) + + // Now the cluster should have nonzero conditions. + Eventually( + func() int { + status := getClusterStatus(ctx, namespace, rayCluster.Name)() + return len(status.Conditions) + }, + time.Second*3, time.Millisecond*500).ShouldNot(Equal(0)) + + // We expect the RayJob's RayClusterStatus to eventually mirror the cluster's status. + Eventually( + func() (int, error) { + currentRayJob := &rayv1.RayJob{} + err := k8sClient.Get(ctx, client.ObjectKey{Name: rayJob.Name, Namespace: namespace}, currentRayJob) + if err != nil { + return 0, err + } + return len(currentRayJob.Status.RayClusterStatus.Conditions), nil + }, + time.Second*3, time.Millisecond*500, + ).ShouldNot(Equal(0)) + }) + It("Make RayCluster.Status.State to be rayv1.Ready", func() { // The RayCluster is not 'Ready' yet because Pods are not running and ready. Expect(rayCluster.Status.State).NotTo(Equal(rayv1.Ready)) diff --git a/ray-operator/controllers/ray/suite_helpers_test.go b/ray-operator/controllers/ray/suite_helpers_test.go index 4ecd9f8aef8..7935291bdfa 100644 --- a/ray-operator/controllers/ray/suite_helpers_test.go +++ b/ray-operator/controllers/ray/suite_helpers_test.go @@ -249,6 +249,36 @@ func checkServeApplicationExists(ctx context.Context, rayService *rayv1.RayServi // So Pods are created, but no controller updates them from Pending to Running. // See https://book.kubebuilder.io/reference/envtest.html for more details. func updateHeadPodToRunningAndReady(ctx context.Context, rayClusterName string, namespace string) { + updateHeadPodToPhaseAndConditions(ctx, rayClusterName, namespace, corev1.PodRunning, []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }) +} + +func updateHeadPodToRunningNotReady(ctx context.Context, rayClusterName string, namespace string) { + updateHeadPodToPhaseAndConditions(ctx, rayClusterName, namespace, corev1.PodRunning, []corev1.PodCondition{ + { + Type: corev1.PodScheduled, + Status: corev1.ConditionTrue, + }, + { + Type: corev1.PodInitialized, + Status: corev1.ConditionTrue, + }, + { + Type: corev1.PodReady, + Status: corev1.ConditionFalse, + }, + { + Type: corev1.ContainersReady, + Status: corev1.ConditionFalse, + }, + }) +} + +func updateHeadPodToPhaseAndConditions(ctx context.Context, rayClusterName string, namespace string, phase corev1.PodPhase, conditions []corev1.PodCondition) { var instance rayv1.RayCluster gomega.Eventually( getResourceFunc(ctx, client.ObjectKey{Name: rayClusterName, Namespace: namespace}, &instance), @@ -262,19 +292,10 @@ func updateHeadPodToRunningAndReady(ctx context.Context, rayClusterName string, time.Second*3, time.Millisecond*500).Should(gomega.Equal(1), "Head pod list should have only 1 Pod = %v", headPods.Items) headPod := headPods.Items[0] - headPod.Status.Phase = corev1.PodRunning - headPod.Status.Conditions = []corev1.PodCondition{ - { - Type: corev1.PodReady, - Status: corev1.ConditionTrue, - }, - } + headPod.Status.Phase = phase + headPod.Status.Conditions = conditions err := k8sClient.Status().Update(ctx, &headPod) - gomega.Expect(err).NotTo(gomega.HaveOccurred(), "Failed to update head Pod status to PodRunning") - - // Make sure the head Pod is updated. - gomega.Eventually( - isAllPodsRunningByFilters).WithContext(ctx).WithArguments(headPods, headLabels).WithTimeout(time.Second*15).WithPolling(time.Millisecond*500).Should(gomega.BeTrue(), "Head Pod should be running: %v", headPods.Items) + gomega.Expect(err).NotTo(gomega.HaveOccurred(), "Failed to update head Pod status to not ready") } // Update the status of the worker Pods to Running and Ready. Similar to updateHeadPodToRunningAndReady.