diff --git a/release/ray_release/glue.py b/release/ray_release/glue.py index e9b4b6559ba6..53022d7e7acf 100644 --- a/release/ray_release/glue.py +++ b/release/ray_release/glue.py @@ -421,42 +421,61 @@ def run_release_test_kuberay( smoke_test: bool = False, test_definition_root: Optional[str] = None, ) -> Result: - result.stable = test.get("stable", True) - result.smoke_test = smoke_test - cluster_compute = load_test_cluster_compute(test, test_definition_root) - kuberay_compute_config = convert_cluster_compute_to_kuberay_compute_config( - cluster_compute - ) - kuberay_autoscaler_version = cluster_compute.get("autoscaler_version", None) - if kuberay_autoscaler_version: - kuberay_autoscaler_config = {"version": kuberay_autoscaler_version} - else: - kuberay_autoscaler_config = None - working_dir_upload_path = upload_working_dir(get_working_dir(test)) + start_time = time.monotonic() + pipeline_exception = None + try: + result.stable = test.get("stable", True) + result.smoke_test = smoke_test + cluster_compute = load_test_cluster_compute(test, test_definition_root) + kuberay_compute_config = convert_cluster_compute_to_kuberay_compute_config( + cluster_compute + ) + kuberay_autoscaler_version = cluster_compute.get("autoscaler_version", None) + if kuberay_autoscaler_version: + kuberay_autoscaler_config = {"version": kuberay_autoscaler_version} + else: + kuberay_autoscaler_config = None + working_dir_upload_path = upload_working_dir(get_working_dir(test)) + + command_timeout = int(test["run"].get("timeout", DEFAULT_COMMAND_TIMEOUT)) + test_name_hash = hashlib.sha256(test["name"].encode()).hexdigest()[:10] + # random 8 digit suffix + random_suffix = "".join(random.choices(string.digits, k=8)) + base_job_name = f"{test['name'][:20]}-{test_name_hash}-{random_suffix}" + job_name = base_job_name.replace("_", "-") + logger.info(f"Job name: {job_name}") + kuberay_job_manager = KubeRayJobManager() + retcode, duration = kuberay_job_manager.run_and_wait( + job_name=job_name, + image=test.get_anyscale_byod_image(), + cmd_to_run=test["run"]["script"], + env_vars=test.get_byod_runtime_env(), + working_dir=working_dir_upload_path, + pip=test.get_byod_pips(), + compute_config=kuberay_compute_config, + autoscaler_config=kuberay_autoscaler_config, + timeout=command_timeout, + ) + kuberay_job_manager.fetch_results() + result.return_code = retcode + result.runtime = duration + except Exception as e: + logger.info(f"Exception: {e}") + pipeline_exception = e + result.runtime = time.monotonic() - start_time - command_timeout = int(test["run"].get("timeout", DEFAULT_COMMAND_TIMEOUT)) - test_name_hash = hashlib.sha256(test["name"].encode()).hexdigest()[:10] - # random 8 digit suffix - random_suffix = "".join(random.choices(string.digits, k=8)) - job_name = f"{test['name'][:20]}-{test_name_hash}-{random_suffix}".replace("_", "-") - logger.info(f"Job name: {job_name}") - logger.info(f"KubeRay compute config: {kuberay_compute_config}") - logger.info(f"KubeRay autoscaler config: {kuberay_autoscaler_config}") - kuberay_job_manager = KubeRayJobManager() - retcode, duration = kuberay_job_manager.run_and_wait( - job_name=job_name, - image=test.get_anyscale_byod_image(), - cmd_to_run=test["run"]["script"], - env_vars=test.get_byod_runtime_env(), - working_dir=working_dir_upload_path, - pip=test.get_byod_pips(), - compute_config=kuberay_compute_config, - autoscaler_config=kuberay_autoscaler_config, - timeout=command_timeout, - ) - kuberay_job_manager.fetch_results() - result.return_code = retcode - result.runtime = duration + if pipeline_exception: + buildkite_group(":rotating_light: Handling errors") + exit_code, result_status, runtime = handle_exception( + pipeline_exception, + result.runtime, + ) + + result.return_code = exit_code.value + result.status = result_status.value + if runtime is not None: + result.runtime = runtime + raise pipeline_exception return result diff --git a/release/ray_release/tests/test_glue.py b/release/ray_release/tests/test_glue.py index 4daa7d765ca3..07af047f15d8 100644 --- a/release/ray_release/tests/test_glue.py +++ b/release/ray_release/tests/test_glue.py @@ -87,7 +87,9 @@ def setUp(self) -> None: self.sdk.returns["get_cloud"] = APIDict(result=APIDict(provider="AWS")) self.writeClusterEnv("{'env': true}") - self.writeClusterCompute("{'compute': true}") + self.writeClusterCompute( + "{'head_node_type': {'name': 'head_node', 'instance_type': 'm5a.4xlarge'}, 'worker_node_types': []}" + ) with open(os.path.join(self.tempdir, "driver_fail.sh"), "wt") as f: f.write("exit 1\n") @@ -175,6 +177,23 @@ def mock_alerter(test: Test, result: Result): ), alert="unit_test_alerter", ) + self.kuberay_test = MockTest( + name="unit_test_end_to_end_kuberay", + run=dict( + type="unit_test", + prepare="prepare_cmd", + script="test_cmd", + wait_for_nodes=dict(num_nodes=4, timeout=40), + ), + working_dir=self.tempdir, + cluster=dict( + cluster_env="cluster_env.yaml", + cluster_compute="cluster_compute.yaml", + byod={}, + ), + env="kuberay", + alert="unit_test_alerter", + ) self.anyscale_project = "prj_unit12345678" def tearDown(self) -> None: @@ -237,18 +256,27 @@ def _succeed_until(self, until: str): self.mock_alert_return = None - def _run(self, result: Result, **kwargs): - run_release_test( - test=self.test, - anyscale_project=self.anyscale_project, - result=result, - log_streaming_limit=1000, - **kwargs - ) + def _run(self, result: Result, kuberay: bool = False, **kwargs): + if kuberay: + run_release_test( + test=self.kuberay_test, + result=result, + log_streaming_limit=1000, + **kwargs + ) + else: + run_release_test( + test=self.test, + anyscale_project=self.anyscale_project, + result=result, + log_streaming_limit=1000, + **kwargs + ) def testInvalidClusterCompute(self): result = Result() + # Test with regular run with patch( "ray_release.glue.load_test_cluster_compute", _fail_on_call(ReleaseTestConfigError), @@ -256,23 +284,39 @@ def testInvalidClusterCompute(self): self._run(result) self.assertEqual(result.return_code, ExitCode.CONFIG_ERROR.value) + # Test with kuberay run + with patch( + "ray_release.glue.load_test_cluster_compute", + _fail_on_call(ReleaseTestConfigError), + ), self.assertRaises(ReleaseTestConfigError): + self._run(result, True) + self.assertEqual(result.return_code, ExitCode.CONFIG_ERROR.value) + # Fails because file not found os.unlink(os.path.join(self.tempdir, "cluster_compute.yaml")) with self.assertRaisesRegex(ReleaseTestConfigError, "Path not found"): self._run(result) self.assertEqual(result.return_code, ExitCode.CONFIG_ERROR.value) + with self.assertRaisesRegex(ReleaseTestConfigError, "Path not found"): + self._run(result, True) + self.assertEqual(result.return_code, ExitCode.CONFIG_ERROR.value) # Fails because invalid jinja template self.writeClusterCompute("{{ INVALID") with self.assertRaisesRegex(ReleaseTestConfigError, "yaml template"): self._run(result) self.assertEqual(result.return_code, ExitCode.CONFIG_ERROR.value) + with self.assertRaisesRegex(ReleaseTestConfigError, "yaml template"): + self._run(result, True) + self.assertEqual(result.return_code, ExitCode.CONFIG_ERROR.value) # Fails because invalid json self.writeClusterCompute("{'test': true, 'fail}") with self.assertRaisesRegex(ReleaseTestConfigError, "quoted scalar"): self._run(result) - + self.assertEqual(result.return_code, ExitCode.CONFIG_ERROR.value) + with self.assertRaisesRegex(ReleaseTestConfigError, "quoted scalar"): + self._run(result, True) self.assertEqual(result.return_code, ExitCode.CONFIG_ERROR.value) def testStartClusterFails(self):