diff --git a/clients/python-client/python_client_test/test_job_api.py b/clients/python-client/python_client_test/test_job_api.py index ba646626f5b..88622a1fc4e 100644 --- a/clients/python-client/python_client_test/test_job_api.py +++ b/clients/python-client/python_client_test/test_job_api.py @@ -510,7 +510,15 @@ def test_list_jobs(self): ) self.assertIsNotNone(status, f"Job {job_info['name']} status should be available") - result = self.api.list_jobs(k8s_namespace=namespace) + # Retry list to allow for eventual consistency (list may lag behind creates) + expected_min_count = initial_count + len(test_jobs) + result = None + for _ in range(10): + result = self.api.list_jobs(k8s_namespace=namespace) + if result and len(result.get("items", [])) >= expected_min_count: + break + time.sleep(2) + self.assertIsNotNone(result, "List jobs should return a result") self.assertIn("items", result, "Result should contain 'items' field") @@ -519,8 +527,8 @@ def test_list_jobs(self): self.assertGreaterEqual( current_count, - initial_count + len(test_jobs), - f"Should have at least {len(test_jobs)} more jobs than initially" + expected_min_count, + f"Should have at least {len(test_jobs)} more jobs than initially (got {current_count}, need {expected_min_count})" ) job_names_in_list = [item.get("metadata", {}).get("name") for item in items] diff --git a/ray-operator/controllers/ray/common/job.go b/ray-operator/controllers/ray/common/job.go index 05025a3e86e..b5ae9399204 100644 --- a/ray-operator/controllers/ray/common/job.go +++ b/ray-operator/controllers/ray/common/job.go @@ -126,13 +126,27 @@ func BuildJobSubmitCommand(rayJobInstance *rayv1.RayJob, submissionMode rayv1.Jo if submissionMode == rayv1.SidecarMode { // Wait until Ray Dashboard GCS is healthy before proceeding. - // Use the same Ray Dashboard GCS health check command as the readiness probe + // Use wget for Ray < 2.53 (older images), and Python when wget may be unavailable (e.g. slim images). rayDashboardGCSHealthCommand := fmt.Sprintf( - utils.BaseWgetHealthCommand, - utils.DefaultReadinessProbeFailureThreshold, + utils.BasePythonHealthCommand, port, utils.RayDashboardGCSHealthPath, + utils.DefaultReadinessProbeFailureThreshold, ) + if rayJobInstance.Spec.RayClusterSpec != nil { + if v, err := semver.NewVersion(rayJobInstance.Spec.RayClusterSpec.RayVersion); err == nil { + // Ray 2.53.0 introduced a unified HTTP health endpoint; slim images without wget exist for newer versions. + minVersion := semver.MustParse("2.53.0") + if v.LessThan(minVersion) { + rayDashboardGCSHealthCommand = fmt.Sprintf( + utils.BaseWgetHealthCommand, + utils.DefaultReadinessProbeFailureThreshold, + port, + utils.RayDashboardGCSHealthPath, + ) + } + } + } waitLoop := []string{ "until", rayDashboardGCSHealthCommand, ">/dev/null", "2>&1", ";", diff --git a/ray-operator/controllers/ray/common/pod.go b/ray-operator/controllers/ray/common/pod.go index 4e99b8479e3..4533ec12110 100644 --- a/ray-operator/controllers/ray/common/pod.go +++ b/ray-operator/controllers/ray/common/pod.go @@ -443,27 +443,33 @@ func initLivenessAndReadinessProbe(rayContainer *corev1.Container, rayNodeType r }, } - rayAgentRayletHealthCommand := fmt.Sprintf( - utils.BaseWgetHealthCommand, - utils.DefaultReadinessProbeTimeoutSeconds, - getPort("dashboard-agent-listen-port", utils.DefaultDashboardAgentListenPort), - utils.RayAgentRayletHealthPath, - ) - rayDashboardGCSHealthCommand := fmt.Sprintf( - utils.BaseWgetHealthCommand, - utils.DefaultReadinessProbeFailureThreshold, - getPort("dashboard-port", utils.DefaultDashboardPort), - utils.RayDashboardGCSHealthPath, - ) - + // For Ray < 2.53, liveness/readiness use exec probes (bash) and rely on CLI tools. // Generally, the liveness and readiness probes perform the same checks. // For head node => Check GCS and Raylet status. // For worker node => Check Raylet status. commands := []string{} - if rayNodeType == rayv1.HeadNode { - commands = append(commands, rayAgentRayletHealthCommand, rayDashboardGCSHealthCommand) - } else { - commands = append(commands, rayAgentRayletHealthCommand) + if !httpHealthCheck { + dashboardAgentPort := getPort("dashboard-agent-listen-port", utils.DefaultDashboardAgentListenPort) + dashboardPort := getPort("dashboard-port", utils.DefaultDashboardPort) + + rayAgentRayletHealthCommand := fmt.Sprintf( + utils.BaseWgetHealthCommand, + utils.DefaultReadinessProbeTimeoutSeconds, + dashboardAgentPort, + utils.RayAgentRayletHealthPath, + ) + rayDashboardGCSHealthCommand := fmt.Sprintf( + utils.BaseWgetHealthCommand, + utils.DefaultReadinessProbeFailureThreshold, + dashboardPort, + utils.RayDashboardGCSHealthPath, + ) + + if rayNodeType == rayv1.HeadNode { + commands = append(commands, rayAgentRayletHealthCommand, rayDashboardGCSHealthCommand) + } else { + commands = append(commands, rayAgentRayletHealthCommand) + } } if rayContainer.LivenessProbe == nil { @@ -504,20 +510,17 @@ func initLivenessAndReadinessProbe(rayContainer *corev1.Container, rayNodeType r rayContainer.ReadinessProbe.Exec = &corev1.ExecAction{Command: []string{"bash", "-c", strings.Join(commands, " && ")}} } - // For worker Pods serving traffic, we need to add an additional HTTP proxy health check for the readiness probe. - // Note: head Pod checks the HTTP proxy's health at every rayservice controller reconcile instaed of using readiness probe. + // For worker Pods serving traffic, readiness checks Ray Serve proxy health only (liveness covers node health). + // Note: head Pod checks the HTTP proxy's health at every rayservice controller reconcile instead of using readiness probe. // See https://github.com/ray-project/kuberay/pull/1808 for reasons. if creatorCRDType == utils.RayServiceCRD && rayNodeType == rayv1.WorkerNode { rayContainer.ReadinessProbe.FailureThreshold = utils.ServeReadinessProbeFailureThreshold - rayServeProxyHealthCommand := fmt.Sprintf( - utils.BaseWgetHealthCommand, - utils.DefaultReadinessProbeInitialDelaySeconds, - utils.FindContainerPort(rayContainer, utils.ServingPortName, utils.DefaultServingPort), - utils.RayServeProxyHealthPath, - ) - commands = append(commands, rayServeProxyHealthCommand) - rayContainer.ReadinessProbe.HTTPGet = nil - rayContainer.ReadinessProbe.Exec = &corev1.ExecAction{Command: []string{"bash", "-c", strings.Join(commands, " && ")}} + servingPort := utils.FindContainerPort(rayContainer, utils.ServingPortName, utils.DefaultServingPort) + rayContainer.ReadinessProbe.HTTPGet = &corev1.HTTPGetAction{ + Path: "/" + utils.RayServeProxyHealthPath, + Port: intstr.IntOrString{Type: intstr.Int, IntVal: servingPort}, + } + rayContainer.ReadinessProbe.Exec = nil } } } diff --git a/ray-operator/controllers/ray/common/pod_test.go b/ray-operator/controllers/ray/common/pod_test.go index c953d7a8e39..77d1078eb33 100644 --- a/ray-operator/controllers/ray/common/pod_test.go +++ b/ray-operator/controllers/ray/common/pod_test.go @@ -1693,16 +1693,18 @@ func TestInitLivenessAndReadinessProbe(t *testing.T) { assert.Nil(t, rayContainer.LivenessProbe.Exec) assert.Nil(t, rayContainer.ReadinessProbe.Exec) - // Test 2: User does not define a custom probe. KubeRay will inject Exec probe for worker pod. - // Here we test the case where the Ray Pod originates from RayServiceCRD, - // implying that an additional serve health check will be added to the readiness probe. + // Test 2: User does not define a custom probe. RayService worker: liveness = exec (node health), readiness = HTTPGet /-/healthz (Serve proxy). rayContainer.LivenessProbe = nil rayContainer.ReadinessProbe = nil initLivenessAndReadinessProbe(rayContainer, rayv1.WorkerNode, utils.RayServiceCRD, rayStartParams, "") assert.NotNil(t, rayContainer.LivenessProbe.Exec) - assert.NotNil(t, rayContainer.ReadinessProbe.Exec) - assert.NotContains(t, strings.Join(rayContainer.LivenessProbe.Exec.Command, " "), utils.RayServeProxyHealthPath) - assert.Contains(t, strings.Join(rayContainer.ReadinessProbe.Exec.Command, " "), utils.RayServeProxyHealthPath) + livenessCmd := strings.Join(rayContainer.LivenessProbe.Exec.Command, " ") + assert.Contains(t, livenessCmd, "wget", "exec probe should use wget for Ray < 2.53") + assert.NotContains(t, livenessCmd, "python3", "exec probe should not require Python for Ray < 2.53") + assert.NotContains(t, livenessCmd, utils.RayServeProxyHealthPath) + assert.NotNil(t, rayContainer.ReadinessProbe.HTTPGet, "RayService worker readiness should use HTTP probe for Serve proxy") + assert.Nil(t, rayContainer.ReadinessProbe.Exec) + assert.Equal(t, "/"+utils.RayServeProxyHealthPath, rayContainer.ReadinessProbe.HTTPGet.Path) assert.Equal(t, int32(2), rayContainer.LivenessProbe.TimeoutSeconds) assert.Equal(t, int32(2), rayContainer.ReadinessProbe.TimeoutSeconds) @@ -1734,6 +1736,8 @@ func TestInitLivenessAndReadinessProbe(t *testing.T) { livenessCommand := strings.Join(rayContainer.LivenessProbe.Exec.Command, " ") readinessCommand := strings.Join(rayContainer.ReadinessProbe.Exec.Command, " ") + assert.Contains(t, livenessCommand, "wget", "exec probe should use wget for Ray < 2.53") + assert.NotContains(t, livenessCommand, "python3") assert.Contains(t, livenessCommand, ":8266", "Head pod liveness probe should use custom dashboard-agent-listen-port") assert.Contains(t, livenessCommand, ":8365", "Head pod liveness probe should use custom dashboard-port") assert.Contains(t, readinessCommand, ":8266", "Head pod readiness probe should use custom dashboard-agent-listen-port") @@ -1757,7 +1761,7 @@ func TestInitLivenessAndReadinessProbe(t *testing.T) { assert.NotContains(t, workerLivenessCommand, fmt.Sprintf(":%d", utils.DefaultDashboardPort), "Worker pod should not check dashboard-port") assert.NotContains(t, workerReadinessCommand, fmt.Sprintf(":%d", utils.DefaultDashboardPort), "Worker pod should not check dashboard-port") - // Test 6: Test RayService worker with custom ports and serve proxy health check + // Test 6: Test RayService worker with custom dashboard port and serve proxy health check (readiness = HTTPGet /-/healthz). rayContainer.LivenessProbe = nil rayContainer.ReadinessProbe = nil rayContainer.Ports = []corev1.ContainerPort{ @@ -1770,9 +1774,10 @@ func TestInitLivenessAndReadinessProbe(t *testing.T) { "dashboard-agent-listen-port": "8500", } initLivenessAndReadinessProbe(rayContainer, rayv1.WorkerNode, utils.RayServiceCRD, rayServiceWorkerParams, "") - rayServiceReadinessCommand := strings.Join(rayContainer.ReadinessProbe.Exec.Command, " ") - assert.Contains(t, rayServiceReadinessCommand, ":8500", "RayService worker should use custom dashboard-agent-listen-port") - assert.Contains(t, rayServiceReadinessCommand, utils.RayServeProxyHealthPath, "RayService worker should include serve proxy health check") + assert.NotNil(t, rayContainer.ReadinessProbe.HTTPGet, "RayService worker readiness should use HTTP probe for Serve proxy") + assert.Nil(t, rayContainer.ReadinessProbe.Exec) + assert.Equal(t, "/"+utils.RayServeProxyHealthPath, rayContainer.ReadinessProbe.HTTPGet.Path) + assert.Equal(t, int32(utils.DefaultServingPort), rayContainer.ReadinessProbe.HTTPGet.Port.IntVal) assert.Equal(t, int32(utils.ServeReadinessProbeFailureThreshold), rayContainer.ReadinessProbe.FailureThreshold, "RayService worker should have correct failure threshold") // Test 8: Test invalid port values (should fall back to defaults) @@ -1786,6 +1791,8 @@ func TestInitLivenessAndReadinessProbe(t *testing.T) { invalidPortLivenessCommand := strings.Join(rayContainer.LivenessProbe.Exec.Command, " ") + assert.Contains(t, invalidPortLivenessCommand, "wget", "exec probe should use wget for Ray < 2.53") + assert.NotContains(t, invalidPortLivenessCommand, "python3") // Should fall back to default ports when invalid values are provided assert.Contains(t, invalidPortLivenessCommand, fmt.Sprintf(":%d", utils.DefaultDashboardAgentListenPort), "Should fall back to default dashboard-agent-listen-port for invalid input") assert.Contains(t, invalidPortLivenessCommand, fmt.Sprintf(":%d", utils.DefaultDashboardPort), "Should fall back to default dashboard-port for invalid input") @@ -1811,15 +1818,15 @@ func TestInitLivenessAndReadinessProbe(t *testing.T) { assert.NotNil(t, rayContainer.LivenessProbe.HTTPGet) assert.NotNil(t, rayContainer.ReadinessProbe.HTTPGet) - // Ray Serve workers still use exec probes for readiness to check the proxy actor. + // Ray Serve workers: liveness = HTTPGet /api/healthz (node), readiness = HTTPGet /-/healthz (Serve proxy). rayContainer.LivenessProbe = nil rayContainer.ReadinessProbe = nil initLivenessAndReadinessProbe(rayContainer, rayv1.WorkerNode, utils.RayServiceCRD, rayStartParams, "2.53.0") assert.NotNil(t, rayContainer.LivenessProbe.HTTPGet) assert.Nil(t, rayContainer.LivenessProbe.Exec) - assert.Nil(t, rayContainer.ReadinessProbe.HTTPGet) - assert.NotNil(t, rayContainer.ReadinessProbe.Exec) - assert.Contains(t, strings.Join(rayContainer.ReadinessProbe.Exec.Command, " "), utils.RayServeProxyHealthPath) + assert.NotNil(t, rayContainer.ReadinessProbe.HTTPGet, "RayService worker readiness should use HTTP probe for Serve proxy") + assert.Nil(t, rayContainer.ReadinessProbe.Exec) + assert.Equal(t, "/"+utils.RayServeProxyHealthPath, rayContainer.ReadinessProbe.HTTPGet.Path) // Versions parsed below 2.53 must use exec probes. rayContainer.LivenessProbe = nil diff --git a/ray-operator/controllers/ray/utils/constant.go b/ray-operator/controllers/ray/utils/constant.go index 8760dcc0a31..12927a05acd 100644 --- a/ray-operator/controllers/ray/utils/constant.go +++ b/ray-operator/controllers/ray/utils/constant.go @@ -244,8 +244,13 @@ const ( RayAgentRayletHealthPath = "api/local_raylet_healthz" RayDashboardGCSHealthPath = "api/gcs_healthz" RayServeProxyHealthPath = "-/healthz" - BaseWgetHealthCommand = "wget --tries 1 -T %d -q -O- http://localhost:%d/%s | grep success" - RayNodeHealthPath = "/api/healthz" + // BaseWgetHealthCommand checks a single health URL; args: timeout_sec, port, path (no leading slash). + // This is used for Ray versions that rely on exec probes and assume common CLI tools exist in the image. + BaseWgetHealthCommand = `wget -q -T %d -O- http://localhost:%d/%s | grep -q success` + // BasePythonHealthCommand checks a single health URL; args: port, path (no leading slash), timeout_sec. + // This is used when wget is not available (e.g. slim Ray images). + BasePythonHealthCommand = `python3 -c "import urllib.request; r=urllib.request.urlopen('http://localhost:%d/%s', timeout=%d); exit(0 if b'success' in r.read() else 1)"` + RayNodeHealthPath = "/api/healthz" // Finalizers for RayJob RayJobStopJobFinalizer = "ray.io/rayjob-finalizer"