Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
rayJobInstance.Status.Reason = rayv1.ValidationFailed
rayJobInstance.Status.Message = err.Error()

// This is the only 2 places where we update the RayJob status. This will directly
// update the JobDeploymentStatus to ValidationFailed if there's validation error
// This is one of the only 3 places where we update the RayJob status. This will directly
// update the JobDeploymentStatus to ValidationFailed if there's validation error.
if err = r.updateRayJobStatus(ctx, originalRayJobInstance, rayJobInstance); err != nil {
logger.Info("Failed to update RayJob status", "error", err)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
Expand Down Expand Up @@ -204,6 +204,12 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
if clientURL := rayJobInstance.Status.DashboardURL; clientURL == "" {
if rayClusterInstance.Status.State != rayv1.Ready {
logger.Info("Wait for the RayCluster.Status.State to be ready before submitting the job.", "RayCluster", rayClusterInstance.Name, "State", rayClusterInstance.Status.State)
// This is one of only 3 places where we update the RayJob status. For observability
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should try our best to avoid calling UpdateStatus in different places. This causes a lot of issues before (that's why I fully rewrite the RayJob controller).

Ideally, we should make every reconciliation go through r.updateRayJobStatus at L430 and update the status there. It is OK if this is a blocker for the patch release (but I guess no).

Please revisit it @rueian @spencer-p. Thanks.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using goto to jump to L430?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @spencer-p!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using goto to jump to L430?

I am not a fan of goto (goto is discouraged generally). Maybe wrap switch into a function instead to avoid returning directly.

func newFunction(...) (ctrl.Result, error) {
  switch ...
}

result, err := newFunction(...)
# update status and handle error

// while waiting for the RayCluster to become ready, we lift the cluster status.
rayJobInstance.Status.RayClusterStatus = rayClusterInstance.Status
if err = r.updateRayJobStatus(ctx, originalRayJobInstance, rayJobInstance); err != nil {
logger.Info("Failed to update RayJob status", "error", err)
}
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}

Expand Down Expand Up @@ -419,8 +425,8 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
}
checkBackoffLimitAndUpdateStatusIfNeeded(ctx, rayJobInstance)

// This is the only 2 places where we update the RayJob status. Please do NOT add any code
// between `checkBackoffLimitAndUpdateStatusIfNeeded` and the following code.
// This is one of the only 3 places where we update the RayJob status. Please do NOT add any
// code between `checkBackoffLimitAndUpdateStatusIfNeeded` and the following code.
if err = r.updateRayJobStatus(ctx, originalRayJobInstance, rayJobInstance); err != nil {
logger.Info("Failed to update RayJob status", "error", err)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
Expand Down
28 changes: 28 additions & 0 deletions ray-operator/controllers/ray/rayjob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,34 @@ var _ = Context("RayJob with different submission modes", func() {
Expect(rayCluster.Annotations).Should(Equal(rayJob.Annotations))
})

It("In Initializing state, the JobStatus should show the RayCluster status", func() {
// The RayCluster is not 'Ready' yet because Pods are not running and ready.
Expect(rayCluster.Status.State).NotTo(Equal(rayv1.Ready))

updateHeadPodToRunningNotReady(ctx, rayJob.Status.RayClusterName, namespace)

// Now the cluster should have nonzero conditions.
Eventually(
func() int {
status := getClusterStatus(ctx, namespace, rayCluster.Name)()
return len(status.Conditions)
},
time.Second*3, time.Millisecond*500).ShouldNot(Equal(0))

// We expect the RayJob's RayClusterStatus to eventually mirror the cluster's status.
Eventually(
func() (int, error) {
currentRayJob := &rayv1.RayJob{}
err := k8sClient.Get(ctx, client.ObjectKey{Name: rayJob.Name, Namespace: namespace}, currentRayJob)
if err != nil {
return 0, err
}
return len(currentRayJob.Status.RayClusterStatus.Conditions), nil
},
time.Second*3, time.Millisecond*500,
).ShouldNot(Equal(0))
})

It("Make RayCluster.Status.State to be rayv1.Ready", func() {
// The RayCluster is not 'Ready' yet because Pods are not running and ready.
Expect(rayCluster.Status.State).NotTo(Equal(rayv1.Ready))
Expand Down
45 changes: 33 additions & 12 deletions ray-operator/controllers/ray/suite_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,36 @@ func checkServeApplicationExists(ctx context.Context, rayService *rayv1.RayServi
// So Pods are created, but no controller updates them from Pending to Running.
// See https://book.kubebuilder.io/reference/envtest.html for more details.
func updateHeadPodToRunningAndReady(ctx context.Context, rayClusterName string, namespace string) {
updateHeadPodToPhaseAndConditions(ctx, rayClusterName, namespace, corev1.PodRunning, []corev1.PodCondition{
{
Type: corev1.PodReady,
Status: corev1.ConditionTrue,
},
})
}

func updateHeadPodToRunningNotReady(ctx context.Context, rayClusterName string, namespace string) {
updateHeadPodToPhaseAndConditions(ctx, rayClusterName, namespace, corev1.PodRunning, []corev1.PodCondition{
{
Type: corev1.PodScheduled,
Status: corev1.ConditionTrue,
},
{
Type: corev1.PodInitialized,
Status: corev1.ConditionTrue,
},
{
Type: corev1.PodReady,
Status: corev1.ConditionFalse,
},
{
Type: corev1.ContainersReady,
Status: corev1.ConditionFalse,
},
})
}

func updateHeadPodToPhaseAndConditions(ctx context.Context, rayClusterName string, namespace string, phase corev1.PodPhase, conditions []corev1.PodCondition) {
var instance rayv1.RayCluster
gomega.Eventually(
getResourceFunc(ctx, client.ObjectKey{Name: rayClusterName, Namespace: namespace}, &instance),
Expand All @@ -262,19 +292,10 @@ func updateHeadPodToRunningAndReady(ctx context.Context, rayClusterName string,
time.Second*3, time.Millisecond*500).Should(gomega.Equal(1), "Head pod list should have only 1 Pod = %v", headPods.Items)

headPod := headPods.Items[0]
headPod.Status.Phase = corev1.PodRunning
headPod.Status.Conditions = []corev1.PodCondition{
{
Type: corev1.PodReady,
Status: corev1.ConditionTrue,
},
}
headPod.Status.Phase = phase
headPod.Status.Conditions = conditions
err := k8sClient.Status().Update(ctx, &headPod)
gomega.Expect(err).NotTo(gomega.HaveOccurred(), "Failed to update head Pod status to PodRunning")

// Make sure the head Pod is updated.
gomega.Eventually(
isAllPodsRunningByFilters).WithContext(ctx).WithArguments(headPods, headLabels).WithTimeout(time.Second*15).WithPolling(time.Millisecond*500).Should(gomega.BeTrue(), "Head Pod should be running: %v", headPods.Items)
gomega.Expect(err).NotTo(gomega.HaveOccurred(), "Failed to update head Pod status to not ready")
}

// Update the status of the worker Pods to Running and Ready. Similar to updateHeadPodToRunningAndReady.
Expand Down
Loading